创建kafka_table需要在default dialect下。

不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法)

Best,
Jingsong

On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach <[email protected]> wrote:

> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
> 如果是default Dialect创建的表,是不是只是在临时会话有效
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 19:27:44,"Jingsong Li" <[email protected]> 写道:
> >Hi,
> >
> >问题一:
> >
> >只要current catalog是HiveCatalog。
> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
> >
> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
> >
> >问题二:
> >
> >用filesystem创建出来的是filesystem的表,它和hive
> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
> >
> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
> >但是它的partition commit是不支持metastore的,所以不会有自动add
> >partition到hive的默认实现,你需要自定义partition-commit-policy.
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach <[email protected]> wrote:
> >
> >> 尴尬。。。。
> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
> >> 还有两个问题问下,
> >> 问题1:
> >> 创建的kafka_table,在hive和Flink
> >>
> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
> >>
> >>
> >>
> >>
> >>
> >>
> >> 问题2:
> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
> >> java.util.concurrent.CompletionException:
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> Could not execute application.
> >>         at
> >>
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> [?:1.8.0_161]
> >>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> [?:1.8.0_161]
> >>         at
> >>
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
> >> [qile-data-flow-1.0.jar:?]
> >>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >> [qile-data-flow-1.0.jar:?]
> >>         at
> >>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> >> [qile-data-flow-1.0.jar:?]
> >>         at
> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> [qile-data-flow-1.0.jar:?]
> >>         at
> >>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> [qile-data-flow-1.0.jar:?]
> >>         at
> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> [qile-data-flow-1.0.jar:?]
> >>         at
> >>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> [qile-data-flow-1.0.jar:?]
> >> Caused by:
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> Could not execute application.
> >>         ... 11 more
> >> Caused by: org.apache.flink.client.program.ProgramInvocationException:
> The
> >> main method caused an error: Unable to create a sink for writing table
> >> 'default_catalog.default_database.hive_table1'.
> >>
> >> Table options are:
> >>
> >> 'connector'='filesystem'
> >> 'hive.storage.file-format'='parquet'
> >> 'is_generic'='false'
> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> 'sink.partition-commit.delay'='0s'
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         ... 10 more
> >> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> >> create a sink for writing table
> >> 'default_catalog.default_database.hive_table1'.
> >>
> >> Table options are:
> >>
> >> 'connector'='filesystem'
> >> 'hive.storage.file-format'='parquet'
> >> 'is_generic'='false'
> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> 'sink.partition-commit.delay'='0s'
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> ~[?:1.8.0_161]
> >>         at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_161]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         ... 10 more
> >> Caused by: org.apache.flink.table.api.ValidationException: Cannot
> discover
> >> a connector using option ''connector'='filesystem''.
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> ~[?:1.8.0_161]
> >>         at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_161]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         ... 10 more
> >> Caused by: org.apache.flink.table.api.ValidationException: Could not
> find
> >> any factory for identifier 'filesystem' that implements
> >> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the
> classpath.
> >>
> >> Available factory identifiers are:
> >>
> >> blackhole
> >> hbase-1.4
> >> jdbc
> >> kafka
> >> print
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> ~[?:1.8.0_161]
> >>         at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_161]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         ... 10 more
> >>
> >>
> >>
> >>
> >>
> >>
> >> query:
> >>
> >>
> >>
> >>
> >> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
> >>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >>     streamExecutionEnv.enableCheckpointing(5 * 1000,
> >> CheckpointingMode.EXACTLY_ONCE)
> >>     streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 *
> 1000)
> >>
> >>     val blinkEnvSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>     val streamTableEnv =
> StreamTableEnvironment.create(streamExecutionEnv,
> >> blinkEnvSettings)
> >>
> >>
> >>
> >>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >>     streamTableEnv.executeSql(
> >>       """
> >>         |
> >>         |
> >>         |CREATE TABLE hive_table (
> >>         |  user_id STRING,
> >>         |  age INT
> >>         |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> >> TBLPROPERTIES (
> >>         |  'connector'='filesystem',
> >>         |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >>         |  'sink.partition-commit.delay'='0s',
> >>         |  'sink.partition-commit.policy.kind'='metastore,success-file'
> >>         |)
> >>         |
> >>         |""".stripMargin)
> >>
> >>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >>     streamTableEnv.executeSql(
> >>       """
> >>         |
> >>         |CREATE TABLE kafka_table (
> >>         |    uid VARCHAR,
> >>         |    -- uid BIGINT,
> >>         |    sex VARCHAR,
> >>         |    age INT,
> >>         |    created_time TIMESTAMP(3),
> >>         |    WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >>         |) WITH (
> >>         |    'connector.type' = 'kafka',
> >>         |    'connector.version' = 'universal',
> >>         |     'connector.topic' = 'user',
> >>         |    -- 'connector.topic' = 'user_long',
> >>         |    'connector.startup-mode' = 'latest-offset',
> >>         |    'connector.properties.zookeeper.connect' =
> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >>         |    'connector.properties.bootstrap.servers' =
> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >>         |    'connector.properties.group.id' = 'user_flink',
> >>         |    'format.type' = 'json',
> >>         |    'format.derive-schema' = 'true'
> >>         |)
> >>         |""".stripMargin)
> >>
> >>
> >>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >>
> >>     streamTableEnv.executeSql(
> >>       """
> >>         |
> >>         |INSERT INTO hive_table
> >>         |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
> >> DATE_FORMAT(created_time, 'HH')
> >>         |FROM kafka_table
> >>         |
> >>         |""".stripMargin)
> >>
> >>     streamTableEnv.executeSql(
> >>       """
> >>         |
> >>         |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18'
> >>         |
> >>         |""".stripMargin)
> >>       .print()
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-13 17:52:54,"Jingsong Li" <[email protected]> 写道:
> >> >你把完整的程序再贴下呢
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[email protected]> wrote:
> >> >
> >> >> Hi,
> >> >>
> >> >>
> >> >> 我现在改成了:
> >> >> 'sink.partition-commit.delay'='0s'
> >> >>
> >> >>
> >> >> checkpoint完成了20多次,hdfs文件也产生了20多个,
> >> >> hive表还是查不到数据
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-07-13 17:23:34,"夏帅" <[email protected]> 写道:
> >> >>
> >> >> 你好,
> >> >> 你设置了1个小时的
> >> >> SINK_PARTITION_COMMIT_DELAY
> >> >>
> >> >>
> >> >> ------------------------------------------------------------------
> >> >> 发件人:Zhou Zach <[email protected]>
> >> >> 发送时间:2020年7月13日(星期一) 17:09
> >> >> 收件人:user-zh <[email protected]>
> >> >> 主 题:Re:Re: Re: Table options do not contain an option key 'connector'
> >> for
> >> >> discovering a connector.
> >> >>
> >> >>
> >> >> 开了checkpoint,
> >> >> val streamExecutionEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment
> >> >>
> >> >>
> >>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >> >> streamExecutionEnv.enableCheckpointing(5 * 1000,
> >> >> CheckpointingMode.EXACTLY_ONCE)
> >> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 *
> 1000)
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-07-13 16:52:16,"Jingsong Li" <[email protected]> 写道:
> >> >> >有开checkpoint吧?delay设的多少?
> >> >> >
> >> >> >Add partition 在 checkpoint完成 + delay的时间后
> >> >> >
> >> >> >Best,
> >> >> >Jingsong
> >> >> >
> >> >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[email protected]>
> wrote:
> >> >> >
> >> >> >> Hi,
> >> >> >>
> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
> >> >> >> partition到hive表吗,我当前设置了参数
> >> >> >> 'sink.partition-commit.policy.kind'='metastore'
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[email protected]>
> >> wrote:
> >> >> >> >Hi,
> >> >> >> >
> >> >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
> >> >> >> >
> >> >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
> >> >> >> >
> >> >> >> >Best,
> >> >> >> >Jingsong
> >> >> >> >
> >> >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[email protected]>
> >> wrote:
> >> >> >> >
> >> >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
> >> >> >> >> h','sink.partition-commit.policy.kind'='success-file');
> >> >> >> >> 也报错误
> >> >> >> >> query:
> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >> >> >> >> streamTableEnv.executeSql(
> >> >> >> >> """
> >> >> >> >>     |
> >> >> >> >>     |
> >> >> >> >>     |CREATE TABLE hive_table (
> >> >> >> >>     |  user_id STRING,
> >> >> >> >>     |  age INT
> >> >> >> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> >> >> >> >> TBLPROPERTIES (
> >> >> >> >>     |  'partition.time-extractor.timestamp-pattern'='$dt
> >> $hr:00:00',
> >> >> >> >>     |  'sink.partition-commit.trigger'='partition-time',
> >> >> >> >>     |  'sink.partition-commit.delay'='1 h',
> >> >> >> >>     |
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> >> >>     |)
> >> >> >> >>     |
> >> >> >> >>     |""".stripMargin)
> >> >> >> >>
> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >> >> >> >> streamTableEnv.executeSql(
> >> >> >> >> """
> >> >> >> >>     |
> >> >> >> >>     |CREATE TABLE kafka_table (
> >> >> >> >>     |    uid VARCHAR,
> >> >> >> >>     |    -- uid BIGINT,
> >> >> >> >>     |    sex VARCHAR,
> >> >> >> >>     |    age INT,
> >> >> >> >>     |    created_time TIMESTAMP(3),
> >> >> >> >>     |    WATERMARK FOR created_time as created_time - INTERVAL
> '3'
> >> >> >> SECOND
> >> >> >> >>     |) WITH (
> >> >> >> >>     |    'connector.type' = 'kafka',
> >> >> >> >>     |    'connector.version' = 'universal',
> >> >> >> >>     |     'connector.topic' = 'user',
> >> >> >> >>     |    -- 'connector.topic' = 'user_long',
> >> >> >> >>     |    'connector.startup-mode' = 'latest-offset',
> >> >> >> >>     |    'connector.properties.zookeeper.connect' =
> >> >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >> >> >>     |    'connector.properties.bootstrap.servers' =
> >> >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >> >> >>     |    'connector.properties.group.id' = 'user_flink',
> >> >> >> >>     |    'format.type' = 'json',
> >> >> >> >>     |    'format.derive-schema' = 'true'
> >> >> >> >>     |)
> >> >> >> >>     |""".stripMargin)
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> streamTableEnv.executeSql(
> >> >> >> >> """
> >> >> >> >>     |
> >> >> >> >>     |INSERT INTO hive_table
> >> >> >> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
> >> >> >> >> DATE_FORMAT(created_time, 'HH')
> >> >> >> >>     |FROM kafka_table
> >> >> >> >>     |
> >> >> >> >>     |""".stripMargin)
> >> >> >> >>
> >> >> >> >> streamTableEnv.executeSql(
> >> >> >> >> """
> >> >> >> >>     |
> >> >> >> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
> >> >> >> >>     |
> >> >> >> >>     |""".stripMargin)
> >> >> >> >> .print()
> >> >> >> >> 错误栈:
> >> >> >> >> Exception in thread "main"
> >> >> >> org.apache.flink.table.api.ValidationException:
> >> >> >> >> Unable to create a sink for writing table
> >> >> >> >> 'default_catalog.default_database.hive_table'.
> >> >> >> >>
> >> >> >> >> Table options are:
> >> >> >> >>
> >> >> >> >> 'hive.storage.file-format'='parquet'
> >> >> >> >> 'is_generic'='false'
> >> >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> >> >> >> 'sink.partition-commit.delay'='1 h'
> >> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> >> >> 'sink.partition-commit.trigger'='partition-time'
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> >> >>         at
> >> >> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >> >> >>         at
> >> >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> >> >> >>         at
> >> >> >> >>
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> >> >> >>         at
> >> >> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> >> >> >>         at
> >> >> >> >>
> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> >> >> >>         at
> >> >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> >> >> >>         at
> >> >> >> >>
> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
> >> >> >> >>         at
> >> >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
> >> >> >> >> Caused by: org.apache.flink.table.api.ValidationException:
> Table
> >> >> options
> >> >> >> >> do not contain an option key 'connector' for discovering a
> >> connector.
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >> >> >> >>         ... 19 more
> >> >> >> >>
> >> >> >> >>
> >> >> >> >
> >> >> >> >--
> >> >> >> >Best, Jingsong Lee
> >> >> >>
> >> >> >
> >> >> >
> >> >> >--
> >> >> >Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee

回复