[jira] [Created] (FLINK-31667) Should not enforce attached mode in RemoteEnvironment & RemoteStreamEnvironment
Jeff Zhang created FLINK-31667: -- Summary: Should not enforce attached mode in RemoteEnvironment & RemoteStreamEnvironment Key: FLINK-31667 URL: https://issues.apache.org/jira/browse/FLINK-31667 Project: Flink Issue Type: Improvement Components: Client / Job Submission Reporter: Jeff Zhang Currently, Flink enforce to use attached mode in `RemoteEnvironment` & `RemoteStreamEnvironment` even user try to use detached mode. This doesn't make sense to me. * https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java#L151 * https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java#L211 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-21607) Twitter table source connector
Jeff Zhang created FLINK-21607: -- Summary: Twitter table source connector Key: FLINK-21607 URL: https://issues.apache.org/jira/browse/FLINK-21607 Project: Flink Issue Type: New Feature Affects Versions: 1.12.2 Reporter: Jeff Zhang It would be nice to have such flink twitter table source connector. This is especially useful to demo flink sql examples. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21607) Twitter table source connector
[ https://issues.apache.org/jira/browse/FLINK-21607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-21607: --- Description: It would be nice to have such flink twitter table source connector. This is especially useful for demo flink sql examples. was: It would be nice to have such flink twitter table source connector. This is especially useful to demo flink sql examples. > Twitter table source connector > -- > > Key: FLINK-21607 > URL: https://issues.apache.org/jira/browse/FLINK-21607 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.12.2 >Reporter: Jeff Zhang >Priority: Major > > It would be nice to have such flink twitter table source connector. This is > especially useful for demo flink sql examples. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17788) scala shell in yarn mode is broken
[ https://issues.apache.org/jira/browse/FLINK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-17788: --- Affects Version/s: 1.10.1 > scala shell in yarn mode is broken > -- > > Key: FLINK-17788 > URL: https://issues.apache.org/jira/browse/FLINK-17788 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.10.1, 1.11.0 >Reporter: Jeff Zhang >Priority: Major > Labels: pull-request-available > > When I start scala shell in yarn mode, one yarn app will be launched, and > after I write some flink code and trigger a flink job, another yarn app will > be launched but would failed to launch due to some conflicts. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18167) Flink Job hangs there when one vertex is failed and another is cancelled.
Jeff Zhang created FLINK-18167: -- Summary: Flink Job hangs there when one vertex is failed and another is cancelled. Key: FLINK-18167 URL: https://issues.apache.org/jira/browse/FLINK-18167 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.10.0 Reporter: Jeff Zhang Attachments: image-2020-06-06-15-39-35-441.png After I call cancel with savepoint, the cancel operation is failed. The following is what I see in client side. {code:java} WARN [2020-06-06 13:45:16,003] ({Thread-1241} JobManager.java[cancelJob]:137) - Fail to cancel job 7e5492f35c1a7f5dad7c805ba943ea52 that is associated with paragraph paragraph_1586733868269_783581378 java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.zeppelin.flink.JobManager.cancelJob(JobManager.java:129) at org.apache.zeppelin.flink.FlinkScalaInterpreter.cancel(FlinkScalaInterpreter.scala:648) at org.apache.zeppelin.flink.FlinkInterpreter.cancel(FlinkInterpreter.java:101) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.cancel(LazyOpenInterpreter.java:119) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.lambda$cancel$1(RemoteInterpreterServer.java:800) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending. at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$stopWithSavepoint$9(SchedulerBase.java:873) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerSynchronousSavepoint$0(CheckpointCoordinator.java:428) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$1(CheckpointCoordinator.java:457) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at
[jira] [Commented] (FLINK-17788) scala shell in yarn mode is broken
[ https://issues.apache.org/jira/browse/FLINK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17120871#comment-17120871 ] Jeff Zhang commented on FLINK-17788: [~kkl0u] The second yarn app fail to launch due to port conflict, here's what I see. But anyway, scala shell should not launch another yarn app, instead it should use only one yarn session cluster. {code} 2020-06-01 17:09:18,860 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent. at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:255) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:516) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89) Caused by: java.net.BindException: Could not start rest endpoint on any port in port range 57574 at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:222) at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:163) ... 9 more {code} > scala shell in yarn mode is broken > -- > > Key: FLINK-17788 > URL: https://issues.apache.org/jira/browse/FLINK-17788 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.11.0 >Reporter: Jeff Zhang >Priority: Major > Labels: pull-request-available > > When I start scala shell in yarn mode, one yarn app will be launched, and > after I write some flink code and trigger a flink job, another yarn app will > be launched but would failed to launch due to some conflicts. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-17788) scala shell in yarn mode is broken
[ https://issues.apache.org/jira/browse/FLINK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118249#comment-17118249 ] Jeff Zhang edited comment on FLINK-17788 at 5/28/20, 3:09 AM: -- [~kkl0u] Yes,each time when I call execute(), scala shell will create a new flink session cluster. But actually the session cluster will fail to launch, I havn't digger into it further, I suspect it is due to some conflict with the previous flink session cluster. was (Author: zjffdu): [~kkl0u] Yes,each time when I call execute(), scala shell will create a new flink session cluster. But actually the session cluster will fail to launch, I havn't digger into it further, I suspect it is due to some conflict with the preview flink session cluster. > scala shell in yarn mode is broken > -- > > Key: FLINK-17788 > URL: https://issues.apache.org/jira/browse/FLINK-17788 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.11.0 >Reporter: Jeff Zhang >Priority: Major > Labels: pull-request-available > > When I start scala shell in yarn mode, one yarn app will be launched, and > after I write some flink code and trigger a flink job, another yarn app will > be launched but would failed to launch due to some conflicts. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17944) Wrong output in sql client's table mode
[ https://issues.apache.org/jira/browse/FLINK-17944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-17944: --- Issue Type: Bug (was: Improvement) > Wrong output in sql client's table mode > --- > > Key: FLINK-17944 > URL: https://issues.apache.org/jira/browse/FLINK-17944 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.11.0 >Reporter: Jeff Zhang >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > When I run the following sql example, I get the wrong output > {code:java} > SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), > ('Bob')) AS NameTable(name) GROUP BY name; {code} > > {code:java} > Bob 1 > Alice 1 > Greg 1 >Bob 2 {code} > This is due to we add kind in Row, so the sematics of equals method changes -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-17788) scala shell in yarn mode is broken
[ https://issues.apache.org/jira/browse/FLINK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118249#comment-17118249 ] Jeff Zhang edited comment on FLINK-17788 at 5/28/20, 2:45 AM: -- [~kkl0u] Yes,each time when I call execute(), scala shell will create a new flink session cluster. But actually the session cluster will fail to launch, I havn't digger into it further, I suspect it is due to some conflict with the preview flink session cluster. was (Author: zjffdu): Yes,each time when I call execute(), scala shell will create a new flink session cluster. But actually the session cluster will fail to launch, I havn't digger into it further, I suspect it is due to some conflict with the preview flink session cluster. > scala shell in yarn mode is broken > -- > > Key: FLINK-17788 > URL: https://issues.apache.org/jira/browse/FLINK-17788 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.11.0 >Reporter: Jeff Zhang >Priority: Major > Labels: pull-request-available > > When I start scala shell in yarn mode, one yarn app will be launched, and > after I write some flink code and trigger a flink job, another yarn app will > be launched but would failed to launch due to some conflicts. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17788) scala shell in yarn mode is broken
[ https://issues.apache.org/jira/browse/FLINK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118249#comment-17118249 ] Jeff Zhang commented on FLINK-17788: Yes,each time when I call execute(), scala shell will create a new flink session cluster. But actually the session cluster will fail to launch, I havn't digger into it further, I suspect it is due to some conflict with the preview flink session cluster. > scala shell in yarn mode is broken > -- > > Key: FLINK-17788 > URL: https://issues.apache.org/jira/browse/FLINK-17788 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.11.0 >Reporter: Jeff Zhang >Priority: Major > Labels: pull-request-available > > When I start scala shell in yarn mode, one yarn app will be launched, and > after I write some flink code and trigger a flink job, another yarn app will > be launched but would failed to launch due to some conflicts. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17788) scala shell in yarn mode is broken
[ https://issues.apache.org/jira/browse/FLINK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17116822#comment-17116822 ] Jeff Zhang commented on FLINK-17788: [~kkl0u] The phenomenon I see is that when I launch scala-shell in yarn mode, it would launch one yarn app for flink yarn session cluster. Then I type some flink code to trigger one flink, then it would launch another yarn app. The root cause is that the deployment option is set to yarn-per-job which is incorrect. Actually deployment option would be set 2 times which cause the wrong setting. * https://github.com/apache/flink/blob/master/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala#L248 * https://github.com/apache/flink/blob/master/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala#L287 This is not a clean fix, I think the whole flink scala shell module need to be refactoring to make the code clean. > scala shell in yarn mode is broken > -- > > Key: FLINK-17788 > URL: https://issues.apache.org/jira/browse/FLINK-17788 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.11.0 >Reporter: Jeff Zhang >Priority: Major > Labels: pull-request-available > > When I start scala shell in yarn mode, one yarn app will be launched, and > after I write some flink code and trigger a flink job, another yarn app will > be launched but would failed to launch due to some conflicts. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17944) Wrong output in sql client's table mode
[ https://issues.apache.org/jira/browse/FLINK-17944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17116765#comment-17116765 ] Jeff Zhang commented on FLINK-17944: I will make a PR for it > Wrong output in sql client's table mode > --- > > Key: FLINK-17944 > URL: https://issues.apache.org/jira/browse/FLINK-17944 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Affects Versions: 1.11.0 >Reporter: Jeff Zhang >Priority: Major > > When I run the following sql example, I get the wrong output > {code:java} > SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), > ('Bob')) AS NameTable(name) GROUP BY name; {code} > > {code:java} > Bob 1 > Alice 1 > Greg 1 >Bob 2 {code} > This is due to we add kind in Row, so the sematics of equals method changes -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17944) Wrong output in sql client's table mode
Jeff Zhang created FLINK-17944: -- Summary: Wrong output in sql client's table mode Key: FLINK-17944 URL: https://issues.apache.org/jira/browse/FLINK-17944 Project: Flink Issue Type: Improvement Components: Table SQL / Client Affects Versions: 1.11.0 Reporter: Jeff Zhang When I run the following sql example, I get the wrong output {code:java} SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name; {code} {code:java} Bob 1 Alice 1 Greg 1 Bob 2 {code} This is due to we add kind in Row, so the sematics of equals method changes -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17889) flink-connector-hive jar contains wrong class in its SPI config file org.apache.flink.table.factories.TableFactory
[ https://issues.apache.org/jira/browse/FLINK-17889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17114191#comment-17114191 ] Jeff Zhang commented on FLINK-17889: \cc [~lirui] [~lzljs3620320] > flink-connector-hive jar contains wrong class in its SPI config file > org.apache.flink.table.factories.TableFactory > -- > > Key: FLINK-17889 > URL: https://issues.apache.org/jira/browse/FLINK-17889 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.11.0 >Reporter: Jeff Zhang >Priority: Major > > These 2 classes are in flink-connector-hive jar's SPI config file > {code:java} > org.apache.flink.orc.OrcFileSystemFormatFactory > License.org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory {code} > Due to this issue, I get the following exception in zeppelin side. > {code:java} > Caused by: java.util.ServiceConfigurationError: > org.apache.flink.table.factories.TableFactory: Provider > org.apache.flink.orc.OrcFileSystemFormatFactory not a subtypeCaused by: > java.util.ServiceConfigurationError: > org.apache.flink.table.factories.TableFactory: Provider > org.apache.flink.orc.OrcFileSystemFormatFactory not a subtype at > java.util.ServiceLoader.fail(ServiceLoader.java:239) at > java.util.ServiceLoader.access$300(ServiceLoader.java:185) at > java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) at > java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at > java.util.ServiceLoader$1.next(ServiceLoader.java:480) at > java.util.Iterator.forEachRemaining(Iterator.java:116) at > org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214) > ... 35 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17889) flink-connector-hive jar contains wrong class in its SPI config file org.apache.flink.table.factories.TableFactory
[ https://issues.apache.org/jira/browse/FLINK-17889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-17889: --- Description: These 2 classes are in flink-connector-hive jar's SPI config file {code:java} org.apache.flink.orc.OrcFileSystemFormatFactory License.org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory {code} Due to this issue, I get the following exception in zeppelin side. {code:java} Caused by: java.util.ServiceConfigurationError: org.apache.flink.table.factories.TableFactory: Provider org.apache.flink.orc.OrcFileSystemFormatFactory not a subtypeCaused by: java.util.ServiceConfigurationError: org.apache.flink.table.factories.TableFactory: Provider org.apache.flink.orc.OrcFileSystemFormatFactory not a subtype at java.util.ServiceLoader.fail(ServiceLoader.java:239) at java.util.ServiceLoader.access$300(ServiceLoader.java:185) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at java.util.Iterator.forEachRemaining(Iterator.java:116) at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214) ... 35 more {code} was: These 2 classes are in flink-connector-hive jar's SPI config file {code:java} org.apache.flink.orc.OrcFileSystemFormatFactory License.org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory {code} Due to this issue, I get the following exception in zeppelin side. {code:java} {code} > flink-connector-hive jar contains wrong class in its SPI config file > org.apache.flink.table.factories.TableFactory > -- > > Key: FLINK-17889 > URL: https://issues.apache.org/jira/browse/FLINK-17889 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.11.0 >Reporter: Jeff Zhang >Priority: Major > > These 2 classes are in flink-connector-hive jar's SPI config file > {code:java} > org.apache.flink.orc.OrcFileSystemFormatFactory > License.org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory {code} > Due to this issue, I get the following exception in zeppelin side. > {code:java} > Caused by: java.util.ServiceConfigurationError: > org.apache.flink.table.factories.TableFactory: Provider > org.apache.flink.orc.OrcFileSystemFormatFactory not a subtypeCaused by: > java.util.ServiceConfigurationError: > org.apache.flink.table.factories.TableFactory: Provider > org.apache.flink.orc.OrcFileSystemFormatFactory not a subtype at > java.util.ServiceLoader.fail(ServiceLoader.java:239) at > java.util.ServiceLoader.access$300(ServiceLoader.java:185) at > java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) at > java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at > java.util.ServiceLoader$1.next(ServiceLoader.java:480) at > java.util.Iterator.forEachRemaining(Iterator.java:116) at > org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214) > ... 35 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17889) flink-connector-hive jar contains wrong class in its SPI config file org.apache.flink.table.factories.TableFactory
Jeff Zhang created FLINK-17889: -- Summary: flink-connector-hive jar contains wrong class in its SPI config file org.apache.flink.table.factories.TableFactory Key: FLINK-17889 URL: https://issues.apache.org/jira/browse/FLINK-17889 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.11.0 Reporter: Jeff Zhang These 2 classes are in flink-connector-hive jar's SPI config file {code:java} org.apache.flink.orc.OrcFileSystemFormatFactory License.org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory {code} Due to this issue, I get the following exception in zeppelin side. {code:java} {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17870) dependent jars are missing to be shipped to cluster in scala shell
[ https://issues.apache.org/jira/browse/FLINK-17870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-17870: --- Summary: dependent jars are missing to be shipped to cluster in scala shell (was: scala shell jars are missing to be shipped to cluster) > dependent jars are missing to be shipped to cluster in scala shell > -- > > Key: FLINK-17870 > URL: https://issues.apache.org/jira/browse/FLINK-17870 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.11.0 >Reporter: Jeff Zhang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17870) scala shell jars are missing to be shipped to cluster
Jeff Zhang created FLINK-17870: -- Summary: scala shell jars are missing to be shipped to cluster Key: FLINK-17870 URL: https://issues.apache.org/jira/browse/FLINK-17870 Project: Flink Issue Type: Bug Components: Scala Shell Affects Versions: 1.11.0 Reporter: Jeff Zhang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17788) scala shell in yarn mode is broken
Jeff Zhang created FLINK-17788: -- Summary: scala shell in yarn mode is broken Key: FLINK-17788 URL: https://issues.apache.org/jira/browse/FLINK-17788 Project: Flink Issue Type: Bug Components: Scala Shell Affects Versions: 1.11.0 Reporter: Jeff Zhang When I start scala shell in yarn mode, one yarn app will be launched, and after I write some flink code and trigger a flink job, another yarn app will be launched but would failed to launch due to some conflicts. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17763) No log files when starting scala-shell
Jeff Zhang created FLINK-17763: -- Summary: No log files when starting scala-shell Key: FLINK-17763 URL: https://issues.apache.org/jira/browse/FLINK-17763 Project: Flink Issue Type: Bug Components: Scala Shell Affects Versions: 1.11.0 Reporter: Jeff Zhang I see the following error when starting scala shell. {code:java} Starting Flink Shell: ERROR StatusLogger No Log4j 2 configuration file found. Using default configuration (logging only errors to the console), or user programmatically provided configurations. Set system property 'log4j2.debug' to show Log4j 2 internal initialization logging. See https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions on how to configure Log4j 2 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17588) Throw exception when hadoop jar is missing in flink scala's yarn mode.
Jeff Zhang created FLINK-17588: -- Summary: Throw exception when hadoop jar is missing in flink scala's yarn mode. Key: FLINK-17588 URL: https://issues.apache.org/jira/browse/FLINK-17588 Project: Flink Issue Type: Improvement Components: Scala Shell Affects Versions: 1.10.0 Reporter: Jeff Zhang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-10911) Enable flink-scala-shell with Scala 2.12
[ https://issues.apache.org/jira/browse/FLINK-10911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-10911: --- Summary: Enable flink-scala-shell with Scala 2.12 (was: Flink's flink-scala-shell is not working with Scala 2.12) > Enable flink-scala-shell with Scala 2.12 > > > Key: FLINK-10911 > URL: https://issues.apache.org/jira/browse/FLINK-10911 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > > Flink's {{flink-scala-shell}} module is not working with Scala 2.12. > Therefore, it is currently excluded from the Scala 2.12 builds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10911) Flink's flink-scala-shell is not working with Scala 2.12
[ https://issues.apache.org/jira/browse/FLINK-10911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092156#comment-17092156 ] Jeff Zhang commented on FLINK-10911: [~trohrmann] We still need to enable scala-2.12 for flink scala shell, I will refine my PR to enable that. > Flink's flink-scala-shell is not working with Scala 2.12 > > > Key: FLINK-10911 > URL: https://issues.apache.org/jira/browse/FLINK-10911 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > > Flink's {{flink-scala-shell}} module is not working with Scala 2.12. > Therefore, it is currently excluded from the Scala 2.12 builds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10911) Flink's flink-scala-shell is not working with Scala 2.12
[ https://issues.apache.org/jira/browse/FLINK-10911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17091594#comment-17091594 ] Jeff Zhang commented on FLINK-10911: [~aljoscha] [~trohrmann] [~chesnay] I tested it in my local machine. flink scala shell in scala 2.12 works for me. Besides there's one PR where I enable flink scala shell for profile scala-2.12. The build is passed. [https://github.com/apache/flink/pull/11895] Do you remember what kind of issue you hit in flink scala shell for scala-2.12 ? > Flink's flink-scala-shell is not working with Scala 2.12 > > > Key: FLINK-10911 > URL: https://issues.apache.org/jira/browse/FLINK-10911 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > > Flink's {{flink-scala-shell}} module is not working with Scala 2.12. > Therefore, it is currently excluded from the Scala 2.12 builds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10911) Flink's flink-scala-shell is not working with Scala 2.12
[ https://issues.apache.org/jira/browse/FLINK-10911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090401#comment-17090401 ] Jeff Zhang commented on FLINK-10911: I will take a look at this issue > Flink's flink-scala-shell is not working with Scala 2.12 > > > Key: FLINK-10911 > URL: https://issues.apache.org/jira/browse/FLINK-10911 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Priority: Major > > Flink's {{flink-scala-shell}} module is not working with Scala 2.12. > Therefore, it is currently excluded from the Scala 2.12 builds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-17225) Support native k8s for scala shell
[ https://issues.apache.org/jira/browse/FLINK-17225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085889#comment-17085889 ] Jeff Zhang edited comment on FLINK-17225 at 4/17/20, 3:59 PM: -- Thanks [~fly_in_gis] [~chesnay] Why does scala 2.12 support block this feature ? Does flink on k8s depend on scala 2.12 ? was (Author: zjffdu): Thanks [~fly_in_gis] [~chesnay] Why does scala 2.12 support block this feature ? Does flink on k8s depends on scala 2.12 ? > Support native k8s for scala shell > -- > > Key: FLINK-17225 > URL: https://issues.apache.org/jira/browse/FLINK-17225 > Project: Flink > Issue Type: New Feature > Components: Scala Shell >Reporter: Yang Wang >Priority: Major > > Currently, the Flink scala shell could create a new or retrieve an existing > YARN session cluster automatically. It is very convenient for the users. > Then it will be great we could also support the K8s deployment. Benefit from > native K8s integration, it is not very difficult for the implementation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17225) Support native k8s for scala shell
[ https://issues.apache.org/jira/browse/FLINK-17225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085889#comment-17085889 ] Jeff Zhang commented on FLINK-17225: Thanks [~fly_in_gis] [~chesnay] Why does scala 2.12 support block this feature ? Does flink on k8s depends on scala 2.12 ? > Support native k8s for scala shell > -- > > Key: FLINK-17225 > URL: https://issues.apache.org/jira/browse/FLINK-17225 > Project: Flink > Issue Type: New Feature > Components: Scala Shell >Reporter: Yang Wang >Priority: Major > > Currently, the Flink scala shell could create a new or retrieve an existing > YARN session cluster automatically. It is very convenient for the users. > Then it will be great we could also support the K8s deployment. Benefit from > native K8s integration, it is not very difficult for the implementation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16936) TablEnv creation and planner execution must be in the same thread
[ https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085046#comment-17085046 ] Jeff Zhang commented on FLINK-16936: Good, thanks for let me know FLIP-84 will make sql execution return after submission. > TablEnv creation and planner execution must be in the same thread > -- > > Key: FLINK-16936 > URL: https://issues.apache.org/jira/browse/FLINK-16936 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > I hit this issue in zeppelin. Let me first describe the thread mode of > zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where > tableenv created, python thread is the python process thread, > python-javagateway-thread is the thread handling request from python > thread(same as pyflink). > Now if I use following table api, I will get the following exception. > {code:java} > st_env.from_path("cdn_access_log")\ >.select("uuid, " >"ip_to_province(client_ip) as province, " >"response_size, request_time")\ >.group_by("province")\ >.select( >"province, count(uuid) as access_count, " >"sum(response_size) as total_download, " >"sum(response_size) * 1.0 / sum(request_time) as download_speed") \ >.insert_into("cdn_access_statistic") {code} > Errors I get > {code:java} > Py4JJavaError: An error occurred while calling o60.insertInto. > : java.lang.RuntimeException: Error while applying rule > FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args > [rel#107:LogicalAggregate.NONE.any.None: > 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) > at > org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:282) > at
[jira] [Commented] (FLINK-16936) TablEnv creation and planner execution must be in the same thread
[ https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084996#comment-17084996 ] Jeff Zhang commented on FLINK-16936: [~ykt836] Is multiple-thread support not on the roadmap ? IMO, this is necessary for batch where user want to submit multiple sql simultaneously, otherwise he has to wait to submit another sql until the last sql is finished. > TablEnv creation and planner execution must be in the same thread > -- > > Key: FLINK-16936 > URL: https://issues.apache.org/jira/browse/FLINK-16936 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > I hit this issue in zeppelin. Let me first describe the thread mode of > zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where > tableenv created, python thread is the python process thread, > python-javagateway-thread is the thread handling request from python > thread(same as pyflink). > Now if I use following table api, I will get the following exception. > {code:java} > st_env.from_path("cdn_access_log")\ >.select("uuid, " >"ip_to_province(client_ip) as province, " >"response_size, request_time")\ >.group_by("province")\ >.select( >"province, count(uuid) as access_count, " >"sum(response_size) as total_download, " >"sum(response_size) * 1.0 / sum(request_time) as download_speed") \ >.insert_into("cdn_access_statistic") {code} > Errors I get > {code:java} > Py4JJavaError: An error occurred while calling o60.insertInto. > : java.lang.RuntimeException: Error while applying rule > FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args > [rel#107:LogicalAggregate.NONE.any.None: > 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) > at > org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at
[jira] [Updated] (FLINK-13811) Support converting flink table to pandas dataframe
[ https://issues.apache.org/jira/browse/FLINK-13811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-13811: --- Description: Pandas dataframe is the de facto standard tableau data format of python community. It would be nice to have the ability to convert flink table to pandas dataframe. (was: Pandas dataframe is the refactor tableau data format of python community. It would be nice to have the ability to convert flink table to pandas dataframe.) > Support converting flink table to pandas dataframe > -- > > Key: FLINK-13811 > URL: https://issues.apache.org/jira/browse/FLINK-13811 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.9.0 >Reporter: Jeff Zhang >Priority: Major > > Pandas dataframe is the de facto standard tableau data format of python > community. It would be nice to have the ability to convert flink table to > pandas dataframe. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17124) The PyFlink Job runs into infinite loop if the UDF file imports job code.
[ https://issues.apache.org/jira/browse/FLINK-17124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17082394#comment-17082394 ] Jeff Zhang commented on FLINK-17124: Should it be a blocker issue for 1.10.1 ? > The PyFlink Job runs into infinite loop if the UDF file imports job code. > - > > Key: FLINK-17124 > URL: https://issues.apache.org/jira/browse/FLINK-17124 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.10.0, 1.11.0 >Reporter: Huang Xingbo >Priority: Major > > If the UDF file imports job code directly or indirectly, the PyFlink Job will > run into a infinite loop as follows: > - submit job > - execute job > - launch UDF worker > - import UDF > - (If the job file is depended by UDF or imported as the top level module) > import job code > - (If the job code is executed outside the "*if __name__ == '__main__':*") > launch gateway server and submit job to local executor > - execute job in local mode > - launch UDF worker > - import UDF > - import job code > ... > This infinite loop will create new Java processes and Python processes > endlessly until the resources on the machine are exhausted. We should fix it > ASAP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17114) When the pyflink job runs in local mode and the command "python" points to Python 2.7, the startup of the Python UDF worker will fail.
[ https://issues.apache.org/jira/browse/FLINK-17114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17082194#comment-17082194 ] Jeff Zhang commented on FLINK-17114: I think it depends what is the default python in your machine. > When the pyflink job runs in local mode and the command "python" points to > Python 2.7, the startup of the Python UDF worker will fail. > -- > > Key: FLINK-17114 > URL: https://issues.apache.org/jira/browse/FLINK-17114 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.10.0, 1.11.0 >Reporter: Wei Zhong >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > When the PyFlink job runs in local mode and the command "python" points to > Python 2.7, the startup of the Python UDF worker will fail because "python" > is the default interpreter of the Python UDF worker. For this case we need to > set the default value of "python.executable" to `sys.executable` i.e. the > python interpreter which launches the job. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16969) Unable to use case class in flink scala shell
[ https://issues.apache.org/jira/browse/FLINK-16969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074442#comment-17074442 ] Jeff Zhang commented on FLINK-16969: \cc [~tiwalter] > Unable to use case class in flink scala shell > - > > Key: FLINK-16969 > URL: https://issues.apache.org/jira/browse/FLINK-16969 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > {code:java} > case class WC(word: String, count: Int) > val wordCounts = benv.fromElements(WC("hello", 1),WC("world", 2), WC("world", > 8)) > wordCounts.collect(){code} > Get the following exception > {code} > java.lang.IllegalArgumentException: requirement failed: > The class WC is an instance class, meaning it is not a member of a > toplevel object, or of an object contained in a toplevel object, > therefore it requires an outer instance to be instantiated, but we don't have > a > reference to the outer instance. Please consider changing the outer class to > an object. > at scala.Predef$.require(Predef.scala:224) > at > org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) > at > org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.(ScalaCaseClassSerializer.scala:46) > ... 66 elided > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16969) Unable to use case class in flink scala shell
[ https://issues.apache.org/jira/browse/FLINK-16969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-16969: --- Description: {code:java} case class WC(word: String, count: Int) val wordCounts = benv.fromElements(WC("hello", 1),WC("world", 2), WC("world", 8)) wordCounts.collect(){code} Get the following exception {code} java.lang.IllegalArgumentException: requirement failed: The class WC is an instance class, meaning it is not a member of a toplevel object, or of an object contained in a toplevel object, therefore it requires an outer instance to be instantiated, but we don't have a reference to the outer instance. Please consider changing the outer class to an object. at scala.Predef$.require(Predef.scala:224) at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.(ScalaCaseClassSerializer.scala:46) ... 66 elided {code} was: {code:java} case class WC(word: String, count: Int)val wordCounts = benv.fromElements(WC("hello", 1),WC("world", 2), WC("world", 8)) wordCounts.collect() java.lang.IllegalArgumentException: requirement failed:The class WC is an instance class, meaning it is not a member of atoplevel object, or of an object contained in a toplevel object,therefore it requires an outer instance to be instantiated, but we don't have areference to the outer instance. Please consider changing the outer class to an object. at scala.Predef$.require(Predef.scala:224) at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.(ScalaCaseClassSerializer.scala:46) ... 66 elided{code} > Unable to use case class in flink scala shell > - > > Key: FLINK-16969 > URL: https://issues.apache.org/jira/browse/FLINK-16969 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > {code:java} > case class WC(word: String, count: Int) > val wordCounts = benv.fromElements(WC("hello", 1),WC("world", 2), WC("world", > 8)) > wordCounts.collect(){code} > Get the following exception > {code} > java.lang.IllegalArgumentException: requirement failed: > The class WC is an instance class, meaning it is not a member of a > toplevel object, or of an object contained in a toplevel object, > therefore it requires an outer instance to be instantiated, but we don't have > a > reference to the outer instance. Please consider changing the outer class to > an object. > at scala.Predef$.require(Predef.scala:224) > at > org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) > at > org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.(ScalaCaseClassSerializer.scala:46) > ... 66 elided > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16969) Unable to use case class in flink scala shell
Jeff Zhang created FLINK-16969: -- Summary: Unable to use case class in flink scala shell Key: FLINK-16969 URL: https://issues.apache.org/jira/browse/FLINK-16969 Project: Flink Issue Type: Bug Components: Scala Shell Affects Versions: 1.10.0 Reporter: Jeff Zhang {code:java} case class WC(word: String, count: Int)val wordCounts = benv.fromElements(WC("hello", 1),WC("world", 2), WC("world", 8)) wordCounts.collect() java.lang.IllegalArgumentException: requirement failed:The class WC is an instance class, meaning it is not a member of atoplevel object, or of an object contained in a toplevel object,therefore it requires an outer instance to be instantiated, but we don't have areference to the outer instance. Please consider changing the outer class to an object. at scala.Predef$.require(Predef.scala:224) at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.(ScalaCaseClassSerializer.scala:46) ... 66 elided{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-16936) TablEnv creation and planner execution must be in the same thread
[ https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073417#comment-17073417 ] Jeff Zhang edited comment on FLINK-16936 at 4/2/20, 6:44 AM: - [~godfreyhe] [~lzljs3620320] [~danny0405] [~jincheng][~tiwalter] was (Author: zjffdu): [~godfreyhe] [~lzljs3620320] [~danny0405] [~tiwalter] > TablEnv creation and planner execution must be in the same thread > -- > > Key: FLINK-16936 > URL: https://issues.apache.org/jira/browse/FLINK-16936 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > I hit this issue in zeppelin. Let me first describe the thread mode of > zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where > tableenv created, python thread is the python process thread, > python-javagateway-thread is the thread handling request from python > thread(same as pyflink). > Now if I use following table api, I will get the following exception. > {code:java} > st_env.from_path("cdn_access_log")\ >.select("uuid, " >"ip_to_province(client_ip) as province, " >"response_size, request_time")\ >.group_by("province")\ >.select( >"province, count(uuid) as access_count, " >"sum(response_size) as total_download, " >"sum(response_size) * 1.0 / sum(request_time) as download_speed") \ >.insert_into("cdn_access_statistic") {code} > Errors I get > {code:java} > Py4JJavaError: An error occurred while calling o60.insertInto. > : java.lang.RuntimeException: Error while applying rule > FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args > [rel#107:LogicalAggregate.NONE.any.None: > 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) > at > org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at
[jira] [Comment Edited] (FLINK-16936) TablEnv creation and planner execution must be in the same thread
[ https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073417#comment-17073417 ] Jeff Zhang edited comment on FLINK-16936 at 4/2/20, 6:44 AM: - [~godfreyhe] [~lzljs3620320] [~danny0405] [~jincheng] [~tiwalter] was (Author: zjffdu): [~godfreyhe] [~lzljs3620320] [~danny0405] [~jincheng][~tiwalter] > TablEnv creation and planner execution must be in the same thread > -- > > Key: FLINK-16936 > URL: https://issues.apache.org/jira/browse/FLINK-16936 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > I hit this issue in zeppelin. Let me first describe the thread mode of > zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where > tableenv created, python thread is the python process thread, > python-javagateway-thread is the thread handling request from python > thread(same as pyflink). > Now if I use following table api, I will get the following exception. > {code:java} > st_env.from_path("cdn_access_log")\ >.select("uuid, " >"ip_to_province(client_ip) as province, " >"response_size, request_time")\ >.group_by("province")\ >.select( >"province, count(uuid) as access_count, " >"sum(response_size) as total_download, " >"sum(response_size) * 1.0 / sum(request_time) as download_speed") \ >.insert_into("cdn_access_statistic") {code} > Errors I get > {code:java} > Py4JJavaError: An error occurred while calling o60.insertInto. > : java.lang.RuntimeException: Error while applying rule > FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args > [rel#107:LogicalAggregate.NONE.any.None: > 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) > at > org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at
[jira] [Commented] (FLINK-16936) TablEnv creation and planner execution must be in the same thread
[ https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073417#comment-17073417 ] Jeff Zhang commented on FLINK-16936: [~godfreyhe] [~lzljs3620320] [~danny0405] [~tiwalter] > TablEnv creation and planner execution must be in the same thread > -- > > Key: FLINK-16936 > URL: https://issues.apache.org/jira/browse/FLINK-16936 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > I hit this issue in zeppelin. Let me first describe the thread mode of > zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where > tableenv created, python thread is the python process thread, > python-javagateway-thread is the thread handling request from python > thread(same as pyflink). > Now if I use following table api, I will get the following exception. > {code:java} > st_env.from_path("cdn_access_log")\ >.select("uuid, " >"ip_to_province(client_ip) as province, " >"response_size, request_time")\ >.group_by("province")\ >.select( >"province, count(uuid) as access_count, " >"sum(response_size) as total_download, " >"sum(response_size) * 1.0 / sum(request_time) as download_speed") \ >.insert_into("cdn_access_statistic") {code} > Errors I get > {code:java} > Py4JJavaError: An error occurred while calling o60.insertInto. > : java.lang.RuntimeException: Error while applying rule > FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args > [rel#107:LogicalAggregate.NONE.any.None: > 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) > at > org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:282) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at
[jira] [Updated] (FLINK-16936) TablEnv creation and planner execution must be in the same thread
[ https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-16936: --- Description: I hit this issue in zeppelin. Let me first describe the thread mode of zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where tableenv created, python thread is the python process thread, python-javagateway-thread is the thread handling request from python thread(same as pyflink). Now if I use following table api, I will get the following exception. {code:java} st_env.from_path("cdn_access_log")\ .select("uuid, " "ip_to_province(client_ip) as province, " "response_size, request_time")\ .group_by("province")\ .select( "province, count(uuid) as access_count, " "sum(response_size) as total_download, " "sum(response_size) * 1.0 / sum(request_time) as download_speed") \ .insert_into("cdn_access_statistic") {code} Errors I get {code:java} Py4JJavaError: An error occurred while calling o60.insertInto. : java.lang.RuntimeException: Error while applying rule FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args [rel#107:LogicalAggregate.NONE.any.None: 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:143) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:236) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:146) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208) ... 34 more Caused by: java.lang.NullPointerException
[jira] [Created] (FLINK-16936) TablEnv creation and planner execution must be in the same thread
Jeff Zhang created FLINK-16936: -- Summary: TablEnv creation and planner execution must be in the same thread Key: FLINK-16936 URL: https://issues.apache.org/jira/browse/FLINK-16936 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: Jeff Zhang I hit this issue in zeppelin. Let me first describe the thread mode of zeppelin. Here's one simple diagram which consists of 3 thread. scalashell-thread is thread where tableenv created, python thread is the python process thread, python-javagateway-thread is the thread handling request from python thread(same as pyflink). Now if I use following table api, I will get the following exception. {code:java} st_env.from_path("cdn_access_log")\ .select("uuid, " "ip_to_province(client_ip) as province, " "response_size, request_time")\ .group_by("province")\ .select( "province, count(uuid) as access_count, " "sum(response_size) as total_download, " "sum(response_size) * 1.0 / sum(request_time) as download_speed") \ .insert_into("cdn_access_statistic") {code} Errors I get {code:java} Py4JJavaError: An error occurred while calling o60.insertInto. : java.lang.RuntimeException: Error while applying rule FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args [rel#107:LogicalAggregate.NONE.any.None: 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:143) at
[jira] [Updated] (FLINK-16936) TablEnv creation and planner execution must be in the same thread
[ https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-16936: --- Description: I hit this issue in zeppelin. Let me first describe the thread mode of zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where tableenv created, python thread is the python process thread, python-javagateway-thread is the thread handling request from python thread(same as pyflink). Now if I use following table api, I will get the following exception. {code:java} st_env.from_path("cdn_access_log")\ .select("uuid, " "ip_to_province(client_ip) as province, " "response_size, request_time")\ .group_by("province")\ .select( "province, count(uuid) as access_count, " "sum(response_size) as total_download, " "sum(response_size) * 1.0 / sum(request_time) as download_speed") \ .insert_into("cdn_access_statistic") {code} Errors I get {code:java} Py4JJavaError: An error occurred while calling o60.insertInto. : java.lang.RuntimeException: Error while applying rule FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args [rel#107:LogicalAggregate.NONE.any.None: 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:143) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:236) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:146) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208) ... 34 more Caused by: java.lang.NullPointerException
[jira] [Comment Edited] (FLINK-16797) Flink doesn't merge multiple sinks into one DAG
[ https://issues.apache.org/jira/browse/FLINK-16797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17067678#comment-17067678 ] Jeff Zhang edited comment on FLINK-16797 at 3/26/20, 1:29 PM: -- Thanks [~godfreyhe] [~jark] [~twalthr] was (Author: zjffdu): Thanks [~godfreyhe] & [~jark] > Flink doesn't merge multiple sinks into one DAG > --- > > Key: FLINK-16797 > URL: https://issues.apache.org/jira/browse/FLINK-16797 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > Here's sql I used. > {code:java} > insert into sink_kafka select status, direction, cast(event_ts/10 as > timestamp(3)) from source_kafka where status <> 'foo'; > insert into sink_kafka2 select status, direction, cast(event_ts/10 as > timestamp(3)) from source_kafka where status <> 'foo'; > {code} > Ideally flink should run these 2 sql as one dag with 2 sinks, but what I see > is that flink won't merge them into one dag. > !https://cdn-images-1.medium.com/max/1760/1*mFu6OZivrfGUgu1UVCcy6A.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16797) Flink doesn't merge multiple sinks into one DAG
[ https://issues.apache.org/jira/browse/FLINK-16797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17067678#comment-17067678 ] Jeff Zhang commented on FLINK-16797: Thanks [~godfreyhe] & [~jark] > Flink doesn't merge multiple sinks into one DAG > --- > > Key: FLINK-16797 > URL: https://issues.apache.org/jira/browse/FLINK-16797 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > Here's sql I used. > {code:java} > insert into sink_kafka select status, direction, cast(event_ts/10 as > timestamp(3)) from source_kafka where status <> 'foo'; > insert into sink_kafka2 select status, direction, cast(event_ts/10 as > timestamp(3)) from source_kafka where status <> 'foo'; > {code} > Ideally flink should run these 2 sql as one dag with 2 sinks, but what I see > is that flink won't merge them into one dag. > !https://cdn-images-1.medium.com/max/1760/1*mFu6OZivrfGUgu1UVCcy6A.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16797) Flink doesn't merge multiple sinks into one DAG
[ https://issues.apache.org/jira/browse/FLINK-16797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17067524#comment-17067524 ] Jeff Zhang commented on FLINK-16797: [~twalthr] I just checked FLIP-84, but it looks like about api refactoring. The issue of this ticket is more about planner & optimizer I believe. > Flink doesn't merge multiple sinks into one DAG > --- > > Key: FLINK-16797 > URL: https://issues.apache.org/jira/browse/FLINK-16797 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > Here's sql I used. > {code:java} > insert into sink_kafka select status, direction, cast(event_ts/10 as > timestamp(3)) from source_kafka where status <> 'foo'; > insert into sink_kafka2 select status, direction, cast(event_ts/10 as > timestamp(3)) from source_kafka where status <> 'foo'; > {code} > Ideally flink should run these 2 sql as one dag with 2 sinks, but what I see > is that flink won't merge them into one dag. > !https://cdn-images-1.medium.com/max/1760/1*mFu6OZivrfGUgu1UVCcy6A.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16797) Flink doesn't merge multiple sinks into one DAG
[ https://issues.apache.org/jira/browse/FLINK-16797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17067523#comment-17067523 ] Jeff Zhang commented on FLINK-16797: Thanks for the info [~twalthr] > Flink doesn't merge multiple sinks into one DAG > --- > > Key: FLINK-16797 > URL: https://issues.apache.org/jira/browse/FLINK-16797 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > Here's sql I used. > {code:java} > insert into sink_kafka select status, direction, cast(event_ts/10 as > timestamp(3)) from source_kafka where status <> 'foo'; > insert into sink_kafka2 select status, direction, cast(event_ts/10 as > timestamp(3)) from source_kafka where status <> 'foo'; > {code} > Ideally flink should run these 2 sql as one dag with 2 sinks, but what I see > is that flink won't merge them into one dag. > !https://cdn-images-1.medium.com/max/1760/1*mFu6OZivrfGUgu1UVCcy6A.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16797) Flink doesn't merge multiple sinks into one DAG
Jeff Zhang created FLINK-16797: -- Summary: Flink doesn't merge multiple sinks into one DAG Key: FLINK-16797 URL: https://issues.apache.org/jira/browse/FLINK-16797 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: Jeff Zhang Here's sql I used. {code:java} insert into sink_kafka select status, direction, cast(event_ts/10 as timestamp(3)) from source_kafka where status <> 'foo'; insert into sink_kafka2 select status, direction, cast(event_ts/10 as timestamp(3)) from source_kafka where status <> 'foo'; {code} Ideally flink should run these 2 sql as one dag with 2 sinks, but what I see is that flink won't merge them into one dag. !https://cdn-images-1.medium.com/max/1760/1*mFu6OZivrfGUgu1UVCcy6A.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16604) Column key in JM configuration is too narrow
[ https://issues.apache.org/jira/browse/FLINK-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-16604: --- Priority: Minor (was: Major) > Column key in JM configuration is too narrow > > > Key: FLINK-16604 > URL: https://issues.apache.org/jira/browse/FLINK-16604 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Minor > Attachments: image-2020-03-15-22-22-01-696.png > > > See the screenshot > > !image-2020-03-15-22-22-01-696.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16604) Column key in JM configuration is too narrow
[ https://issues.apache.org/jira/browse/FLINK-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059711#comment-17059711 ] Jeff Zhang commented on FLINK-16604: \cc [~vthinkxie] > Column key in JM configuration is too narrow > > > Key: FLINK-16604 > URL: https://issues.apache.org/jira/browse/FLINK-16604 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > Attachments: image-2020-03-15-22-22-01-696.png > > > See the screenshot > > !image-2020-03-15-22-22-01-696.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16604) Column key in JM configuration is too narrow
[ https://issues.apache.org/jira/browse/FLINK-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-16604: --- Attachment: (was: image-2020-03-15-22-21-17-369.png) > Column key in JM configuration is too narrow > > > Key: FLINK-16604 > URL: https://issues.apache.org/jira/browse/FLINK-16604 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > Attachments: image-2020-03-15-22-22-01-696.png > > > See the screenshot > > !image-2020-03-15-22-22-01-696.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16604) Column key in JM configuration is too narrow
[ https://issues.apache.org/jira/browse/FLINK-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-16604: --- Description: See the screenshot !image-2020-03-15-22-22-01-696.png! was: See the screenshot !image-2020-03-15-22-21-17-369.png! > Column key in JM configuration is too narrow > > > Key: FLINK-16604 > URL: https://issues.apache.org/jira/browse/FLINK-16604 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > Attachments: image-2020-03-15-22-21-17-369.png, > image-2020-03-15-22-22-01-696.png > > > See the screenshot > > !image-2020-03-15-22-22-01-696.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16604) Column key in JM configuration is too narrow
[ https://issues.apache.org/jira/browse/FLINK-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-16604: --- Attachment: image-2020-03-15-22-22-01-696.png > Column key in JM configuration is too narrow > > > Key: FLINK-16604 > URL: https://issues.apache.org/jira/browse/FLINK-16604 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > Attachments: image-2020-03-15-22-21-17-369.png, > image-2020-03-15-22-22-01-696.png > > > See the screenshot > > !image-2020-03-15-22-21-17-369.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16604) Column key in JM configuration is too narrow
Jeff Zhang created FLINK-16604: -- Summary: Column key in JM configuration is too narrow Key: FLINK-16604 URL: https://issues.apache.org/jira/browse/FLINK-16604 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Affects Versions: 1.10.0 Reporter: Jeff Zhang Attachments: image-2020-03-15-22-21-17-369.png See the screenshot !image-2020-03-15-22-21-17-369.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16418) Hide hive version to avoid user confuse
[ https://issues.apache.org/jira/browse/FLINK-16418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17051021#comment-17051021 ] Jeff Zhang commented on FLINK-16418: [~lzljs3620320] Do you mean to detect hive version for users ? > Hide hive version to avoid user confuse > --- > > Key: FLINK-16418 > URL: https://issues.apache.org/jira/browse/FLINK-16418 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Fix For: 1.11.0 > > > Version in Yaml/HiveCatalog needs to be consistent with the dependencies > version. There are three places: version in metastore, version in > dependencies, version in Yaml/HiveCatalog, users are easy to make mistakes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-14991) Export `FLINK_HOME` environment variable to all the entrypoint
[ https://issues.apache.org/jira/browse/FLINK-14991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16985271#comment-16985271 ] Jeff Zhang edited comment on FLINK-14991 at 2/28/20 1:43 PM: - +1, but it would be better to reserve `FLINK_CONF_DIR`. That means we could derive `FLINK_LIB_DIR`,`FLINK_OPT_DIR`,`FLINK_PLUGIN_DIR`,`FLINK_BIN_DIR` from `FLINK_HOME`, but try to look for env `FLINK_CONF_DIR` first, if it doesn't exist, then use `FLINK_HOME/conf`. The reason is that some vendor flink distribution will define `FLINK_CONF_DIR` separately which doesn't need to be under `FLINK_HOME`, e.g. /etc/flink/conf was (Author: zjffdu): +1, but it would be better to reserve `FLINK_CONF_DIR`. That means we could derive `FLINK_LIB_DIR`,`FLINK_OPT_DIR`,`FLINK_PLUGIN_DIR`,`FLINK_BIN_DIR` from `FLINK_HOME`, but try to look for env `FLINK_CONF_DIR` first, if it doesn't exist, then use `FLINK_HOME/conf`. The reason is that some vendor flink distribution will define `FLINK_HOME/conf` separately. > Export `FLINK_HOME` environment variable to all the entrypoint > -- > > Key: FLINK-14991 > URL: https://issues.apache.org/jira/browse/FLINK-14991 > Project: Flink > Issue Type: Improvement > Components: Command Line Client, Deployment / Scripts >Reporter: Guowei Ma >Priority: Minor > > Currently, Flink depends on 6 types of files: configuration files, system > jars files, script files、library jar files, plugin jar files, and user jars > files. These files are in different directories. > Flink exports 5 environment variables to locate these different type files: > `FLINK_CONF_DIR`,`FLINK_LIB_DIR`,`FLINK_OPT_DIR`,`FLINK_PLUGIN_DIR`,`FLINK_BIN_DIR`. > It is not a good style that exports an environment variable for every type of > file. > So this jira proposes to export the `FLINK_HOME` environment variable to all > the entrypoint. Derive the directory of the different type files from the > `FLINK_HOME` environment variable and every type file has a fixed directory > name. > This also has another benefit that the method implies the directory > structure is the same in all the situations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-16286) Support select from nothing
[ https://issues.apache.org/jira/browse/FLINK-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang closed FLINK-16286. -- Resolution: Invalid > Support select from nothing > --- > > Key: FLINK-16286 > URL: https://issues.apache.org/jira/browse/FLINK-16286 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > It would be nice to support from noting in flink sql. e.g. > {code:java} > select "name", 1+1, Date() {code} > This is especially useful when user want to try udf provided by others > without creating new table for faked data. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16286) Support select from nothing
[ https://issues.apache.org/jira/browse/FLINK-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045244#comment-17045244 ] Jeff Zhang commented on FLINK-16286: Sorry, my fault. I used double quote which cause the parsing error. I should use single quote {code:java} select 'name' {code} > Support select from nothing > --- > > Key: FLINK-16286 > URL: https://issues.apache.org/jira/browse/FLINK-16286 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > It would be nice to support from noting in flink sql. e.g. > {code:java} > select "name", 1+1, Date() {code} > This is especially useful when user want to try udf provided by others > without creating new table for faked data. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16286) Support select from nothing
[ https://issues.apache.org/jira/browse/FLINK-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045228#comment-17045228 ] Jeff Zhang commented on FLINK-16286: Is it supported in 1.10 ? I just tried flink 1.10 > Support select from nothing > --- > > Key: FLINK-16286 > URL: https://issues.apache.org/jira/browse/FLINK-16286 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > It would be nice to support from noting in flink sql. e.g. > {code:java} > select "name", 1+1, Date() {code} > This is especially useful when user want to try udf provided by others > without creating new table for faked data. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16286) Support select from nothing
[ https://issues.apache.org/jira/browse/FLINK-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-16286: --- Component/s: Table SQL / API > Support select from nothing > --- > > Key: FLINK-16286 > URL: https://issues.apache.org/jira/browse/FLINK-16286 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > It would be nice to support from noting in flink sql. e.g. > {code:java} > select "name", 1+1, Date() {code} > This is especially useful when user want to try udf provided by others > without creating new table for faked data. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16286) Support select from nothing
Jeff Zhang created FLINK-16286: -- Summary: Support select from nothing Key: FLINK-16286 URL: https://issues.apache.org/jira/browse/FLINK-16286 Project: Flink Issue Type: New Feature Affects Versions: 1.10.0 Reporter: Jeff Zhang It would be nice to support from noting in flink sql. e.g. {code:java} select "name", 1+1, Date() {code} This is especially useful when user want to try udf provided by others without creating new table for faked data. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16266) es6 & 7 table sql connector conflicts each other
[ https://issues.apache.org/jira/browse/FLINK-16266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17044043#comment-17044043 ] Jeff Zhang commented on FLINK-16266: AFAIK plugin mechanism is not ready for connectors. Someone others may know more details. > es6 & 7 table sql connector conflicts each other > > > Key: FLINK-16266 > URL: https://issues.apache.org/jira/browse/FLINK-16266 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Affects Versions: 1.10.0 >Reporter: Benchao Li >Priority: Major > > If we put {{flink-sql-connector-elasticsearch6}} and > {{flink-sql-connector-elasticsearch7}} into {{/lib}} at the same time, and > use it in {{sql-client}}, will get exceptions like: > > {code:java} > [ERROR] Could not execute SQL statement. Reason: > java.lang.AbstractMethodError: > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase.createElasticsearchUpsertTableSink(ZLorg/apache/flink/table/api/TableSchema;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Lorg/apache/flink/api/common/serialization/SerializationSchema;Lorg/apache/flink/elasticsearch6/shaded/org/elasticsearch/common/xcontent/XContentType;Lorg/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler;Ljava/util/Map;)Lorg/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase; > {code} > > After analyzing the exceptions, IMO, it's because > {{flink-connector-elasticsearch-base}} is included into both > {{flink-sql-connector-elasticsearch6}} and > {{flink-sql-connector-elasticsearch7. And > }}{{flink-connector-elasticsearch-base}}{{ has different implementation in 6 > & 7, because the version of elastic-search is different.}} > > {{A simple way for fixing this is we can relocate > }}{{flink-connector-elasticsearch-base in > }}{{flink-sql-connector-elasticsearch6}}{{ and > }}{{flink-sql-connector-elasticsearch7. For example for > }}{{flink-sql-connector-elasticsearch7:}} > {code:java} > >org.apache.flink.streaming.connectors.elasticsearch. > > org.apache.flink.streaming.connectors.elasticsearch7.base. > > {code} > {{cc [~jark] [~twalthr] }} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16266) es6 & 7 table sql connector conflicts each other
[ https://issues.apache.org/jira/browse/FLINK-16266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17044016#comment-17044016 ] Jeff Zhang commented on FLINK-16266: BTW, IMHO I don't think it is best practice to put connector jars under lib. Because most of time FLINK_HOME is shared by multiples users/applications. Putting third party jars under lib would affect other flink applications > es6 & 7 table sql connector conflicts each other > > > Key: FLINK-16266 > URL: https://issues.apache.org/jira/browse/FLINK-16266 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Affects Versions: 1.10.0 >Reporter: Benchao Li >Priority: Major > > If we put {{flink-sql-connector-elasticsearch6}} and > {{flink-sql-connector-elasticsearch7}} into {{/lib}} at the same time, and > use it in {{sql-client}}, will get exceptions like: > > {code:java} > [ERROR] Could not execute SQL statement. Reason: > java.lang.AbstractMethodError: > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase.createElasticsearchUpsertTableSink(ZLorg/apache/flink/table/api/TableSchema;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Lorg/apache/flink/api/common/serialization/SerializationSchema;Lorg/apache/flink/elasticsearch6/shaded/org/elasticsearch/common/xcontent/XContentType;Lorg/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler;Ljava/util/Map;)Lorg/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase; > {code} > > After analyzing the exceptions, IMO, it's because > {{flink-connector-elasticsearch-base}} is included into both > {{flink-sql-connector-elasticsearch6}} and > {{flink-sql-connector-elasticsearch7. And > }}{{flink-connector-elasticsearch-base}}{{ has different implementation in 6 > & 7, because the version of elastic-search is different.}} > > {{A simple way for fixing this is we can relocate > }}{{flink-connector-elasticsearch-base in > }}{{flink-sql-connector-elasticsearch6}}{{ and > }}{{flink-sql-connector-elasticsearch7. For example for > }}{{flink-sql-connector-elasticsearch7:}} > {code:java} > >org.apache.flink.streaming.connectors.elasticsearch. > > org.apache.flink.streaming.connectors.elasticsearch7.base. > > {code} > {{cc [~jark] [~twalthr] }} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16266) es6 & 7 table sql connector conflicts each other
[ https://issues.apache.org/jira/browse/FLINK-16266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17044013#comment-17044013 ] Jeff Zhang commented on FLINK-16266: I believe this could be resolved via plugin mechanism. > es6 & 7 table sql connector conflicts each other > > > Key: FLINK-16266 > URL: https://issues.apache.org/jira/browse/FLINK-16266 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Affects Versions: 1.10.0 >Reporter: Benchao Li >Priority: Major > > If we put {{flink-sql-connector-elasticsearch6}} and > {{flink-sql-connector-elasticsearch7}} into {{/lib}} at the same time, and > use it in {{sql-client}}, will get exceptions like: > > {code:java} > [ERROR] Could not execute SQL statement. Reason: > java.lang.AbstractMethodError: > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase.createElasticsearchUpsertTableSink(ZLorg/apache/flink/table/api/TableSchema;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Lorg/apache/flink/api/common/serialization/SerializationSchema;Lorg/apache/flink/elasticsearch6/shaded/org/elasticsearch/common/xcontent/XContentType;Lorg/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler;Ljava/util/Map;)Lorg/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase; > {code} > > After analyzing the exceptions, IMO, it's because > {{flink-connector-elasticsearch-base}} is included into both > {{flink-sql-connector-elasticsearch6}} and > {{flink-sql-connector-elasticsearch7. And > }}{{flink-connector-elasticsearch-base}}{{ has different implementation in 6 > & 7, because the version of elastic-search is different.}} > > {{A simple way for fixing this is we can relocate > }}{{flink-connector-elasticsearch-base in > }}{{flink-sql-connector-elasticsearch6}}{{ and > }}{{flink-sql-connector-elasticsearch7. For example for > }}{{flink-sql-connector-elasticsearch7:}} > {code:java} > >org.apache.flink.streaming.connectors.elasticsearch. > > org.apache.flink.streaming.connectors.elasticsearch7.base. > > {code} > {{cc [~jark] [~twalthr] }} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15935) Unable to use watermark when depends both on flink planner and blink planner
[ https://issues.apache.org/jira/browse/FLINK-15935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-15935: --- Component/s: Table SQL / Planner > Unable to use watermark when depends both on flink planner and blink planner > > > Key: FLINK-15935 > URL: https://issues.apache.org/jira/browse/FLINK-15935 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Blocker > > Run the following code in module `flink-examples-table` (must be under this > module) > {code:java} > /* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > package org.apache.flink.table.examples.java; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.java.StreamTableEnvironment; > /** > * javadoc. > */ > public class TableApiExample { >/** > * > * @param args > * @throws Exception > */ >public static void main(String[] args) throws Exception { > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = > StreamTableEnvironment.create(bsEnv, bsSettings); > bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" + > "status STRING,\n" + > "direction STRING,\n" + > "event_ts TIMESTAMP(3),\n" + > "WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" + > ") WITH (\n" + > " 'connector.type' = 'kafka', \n" + > " 'connector.version' = 'universal',\n" + > " 'connector.topic' = 'generated.events2',\n" + > " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + > " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'connector.properties.group.id' = 'testGroup',\n" + > " 'format.type'='json',\n" + > " 'update-mode' = 'append'\n" + > ")\n"); >} > } > {code} > > And hit the following error: > {code:java} > Exception in thread "main" > org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to > line 5, column 38: Unknown identifier 'event_ts'Exception in thread "main" > org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to > line 5, column 38: Unknown identifier 'event_ts' at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) > at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at >
[jira] [Updated] (FLINK-15935) Unable to use watermark when depends both on flink planner and blink planner
[ https://issues.apache.org/jira/browse/FLINK-15935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-15935: --- Description: Run the following code in module `flink-examples-table` (must be under this module) {code:java} /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.table.examples.java; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.StreamTableEnvironment; /** * javadoc. */ public class TableApiExample { /** * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" + "status STRING,\n" + "direction STRING,\n" + "event_ts TIMESTAMP(3),\n" + "WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = 'universal',\n" + " 'connector.topic' = 'generated.events2',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'connector.properties.group.id' = 'testGroup',\n" + " 'format.type'='json',\n" + " 'update-mode' = 'append'\n" + ")\n"); } } {code} And hit the following error: {code:java} Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to line 5, column 38: Unknown identifier 'event_ts'Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to line 5, column 38: Unknown identifier 'event_ts' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947) at
[jira] [Commented] (FLINK-15935) Unable to use watermark when depends both on flink planner and blink planner
[ https://issues.apache.org/jira/browse/FLINK-15935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17031320#comment-17031320 ] Jeff Zhang commented on FLINK-15935: \cc [~jark] [~lzljs3620320] [~ykt836] [~liyu] [~gjy] > Unable to use watermark when depends both on flink planner and blink planner > > > Key: FLINK-15935 > URL: https://issues.apache.org/jira/browse/FLINK-15935 > Project: Flink > Issue Type: Bug >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Blocker > > Run the following code in module `flink-table-examples` (must be under this > module) > {code:java} > /* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > package org.apache.flink.table.examples.java; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.java.StreamTableEnvironment; > /** > * javadoc. > */ > public class TableApiExample { >/** > * > * @param args > * @throws Exception > */ >public static void main(String[] args) throws Exception { > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = > StreamTableEnvironment.create(bsEnv, bsSettings); > bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" + > "status STRING,\n" + > "direction STRING,\n" + > "event_ts TIMESTAMP(3),\n" + > "WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" + > ") WITH (\n" + > " 'connector.type' = 'kafka', \n" + > " 'connector.version' = 'universal',\n" + > " 'connector.topic' = 'generated.events2',\n" + > " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + > " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'connector.properties.group.id' = 'testGroup',\n" + > " 'format.type'='json',\n" + > " 'update-mode' = 'append'\n" + > ")\n"); >} > } > {code} > > And hit the following error: > {code:java} > Exception in thread "main" > org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to > line 5, column 38: Unknown identifier 'event_ts'Exception in thread "main" > org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to > line 5, column 38: Unknown identifier 'event_ts' at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) > at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at >
[jira] [Created] (FLINK-15935) Unable to use watermark when depends both on flink planner and blink planner
Jeff Zhang created FLINK-15935: -- Summary: Unable to use watermark when depends both on flink planner and blink planner Key: FLINK-15935 URL: https://issues.apache.org/jira/browse/FLINK-15935 Project: Flink Issue Type: Bug Affects Versions: 1.10.0 Reporter: Jeff Zhang Run the following code in module `flink-table-examples` (must be under this module) {code:java} /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.table.examples.java; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.StreamTableEnvironment; /** * javadoc. */ public class TableApiExample { /** * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" + "status STRING,\n" + "direction STRING,\n" + "event_ts TIMESTAMP(3),\n" + "WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = 'universal',\n" + " 'connector.topic' = 'generated.events2',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'connector.properties.group.id' = 'testGroup',\n" + " 'format.type'='json',\n" + " 'update-mode' = 'append'\n" + ")\n"); } } {code} And hit the following error: {code:java} Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to line 5, column 38: Unknown identifier 'event_ts'Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to line 5, column 38: Unknown identifier 'event_ts' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at
[jira] [Commented] (FLINK-15858) Unable to use HiveCatalog and kafka together
[ https://issues.apache.org/jira/browse/FLINK-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17030299#comment-17030299 ] Jeff Zhang commented on FLINK-15858: [~liyu] [~gjy] This ticket means users unable to use kafka with hivecatalog together when involving timestamp field. I believe this should be a blocker issue for 1.10 > Unable to use HiveCatalog and kafka together > > > Key: FLINK-15858 > URL: https://issues.apache.org/jira/browse/FLINK-15858 > Project: Flink > Issue Type: Bug > Components: Table SQL / Ecosystem >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > > HiveCatalog only support timestamp(9), but kafka only support timestamp(3). > This make user unable to use HiveCatalog and kafka together > {code:java} > Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: > HiveCatalog currently only supports timestamp of precision 9 > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272) > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173) > at > org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151) > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84) > at > org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15858) Unable to use HiveCatalog and kafka together
[ https://issues.apache.org/jira/browse/FLINK-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-15858: --- Priority: Blocker (was: Critical) > Unable to use HiveCatalog and kafka together > > > Key: FLINK-15858 > URL: https://issues.apache.org/jira/browse/FLINK-15858 > Project: Flink > Issue Type: Bug > Components: Table SQL / Ecosystem >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > > HiveCatalog only support timestamp(9), but kafka only support timestamp(3). > This make user unable to use HiveCatalog and kafka together > {code:java} > Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: > HiveCatalog currently only supports timestamp of precision 9 > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272) > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173) > at > org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151) > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84) > at > org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15858) Unable to use HiveCatalog and kafka together
[ https://issues.apache.org/jira/browse/FLINK-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17030295#comment-17030295 ] Jeff Zhang commented on FLINK-15858: [~lzljs3620320] But IIRC, this issue means users unable to use kafka with hivecatalog together. Doesn't that mean a blocker issue ? > Unable to use HiveCatalog and kafka together > > > Key: FLINK-15858 > URL: https://issues.apache.org/jira/browse/FLINK-15858 > Project: Flink > Issue Type: Bug > Components: Table SQL / Ecosystem >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Critical > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > > HiveCatalog only support timestamp(9), but kafka only support timestamp(3). > This make user unable to use HiveCatalog and kafka together > {code:java} > Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: > HiveCatalog currently only supports timestamp of precision 9 > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272) > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173) > at > org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151) > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84) > at > org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15858) Unable to use HiveCatalog and kafka together
[ https://issues.apache.org/jira/browse/FLINK-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029495#comment-17029495 ] Jeff Zhang commented on FLINK-15858: In that case, it is a regression issue, definitely need to fix in 1.10 > Unable to use HiveCatalog and kafka together > > > Key: FLINK-15858 > URL: https://issues.apache.org/jira/browse/FLINK-15858 > Project: Flink > Issue Type: Bug > Components: Table SQL / Ecosystem >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Blocker > Fix For: 1.10.0 > > > > HiveCatalog only support timestamp(9), but kafka only support timestamp(3). > This make user unable to use HiveCatalog and kafka together > {code:java} > Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: > HiveCatalog currently only supports timestamp of precision 9 > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272) > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173) > at > org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151) > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84) > at > org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15858) Unable to use HiveCatalog and kafka together
[ https://issues.apache.org/jira/browse/FLINK-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-15858: --- Priority: Blocker (was: Critical) > Unable to use HiveCatalog and kafka together > > > Key: FLINK-15858 > URL: https://issues.apache.org/jira/browse/FLINK-15858 > Project: Flink > Issue Type: Bug > Components: Table SQL / Ecosystem >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Blocker > Fix For: 1.10.0 > > > > HiveCatalog only support timestamp(9), but kafka only support timestamp(3). > This make user unable to use HiveCatalog and kafka together > {code:java} > Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: > HiveCatalog currently only supports timestamp of precision 9 > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272) > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173) > at > org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151) > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84) > at > org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15858) Unable to use HiveCatalog and kafka together
[ https://issues.apache.org/jira/browse/FLINK-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17028713#comment-17028713 ] Jeff Zhang commented on FLINK-15858: \cc [~lirui] [~lzljs3620320] > Unable to use HiveCatalog and kafka together > > > Key: FLINK-15858 > URL: https://issues.apache.org/jira/browse/FLINK-15858 > Project: Flink > Issue Type: Bug >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > > HiveCatalog only support timestamp(9), but kafka only support timestamp(3). > This make user unable to use HiveCatalog and kafka together > {code:java} > Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: > HiveCatalog currently only supports timestamp of precision 9 > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272) > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173) > at > org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151) > at > org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84) > at > org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15858) Unable to use HiveCatalog and kafka together
Jeff Zhang created FLINK-15858: -- Summary: Unable to use HiveCatalog and kafka together Key: FLINK-15858 URL: https://issues.apache.org/jira/browse/FLINK-15858 Project: Flink Issue Type: Bug Affects Versions: 1.10.0 Reporter: Jeff Zhang HiveCatalog only support timestamp(9), but kafka only support timestamp(3). This make user unable to use HiveCatalog and kafka together {code:java} Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: HiveCatalog currently only supports timestamp of precision 9 at org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272) at org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173) at org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151) at org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84) at org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15644) Add support for SQL query validation
[ https://issues.apache.org/jira/browse/FLINK-15644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018592#comment-17018592 ] Jeff Zhang commented on FLINK-15644: [~fhueske] Will DDL/DML also be verified ? > Add support for SQL query validation > - > > Key: FLINK-15644 > URL: https://issues.apache.org/jira/browse/FLINK-15644 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Fabian Hueske >Priority: Major > > It would be good if the {{TableEnvironment}} would offer methods to check the > validity of SQL queries. Such a method could be used by services (CLI query > shells, notebooks, SQL UIs) that are backed by Flink and execute their > queries on Flink. > Validation should be available in two levels: > # Validation of syntax and semantics: This includes parsing the query, > checking the catalog for dbs, tables, fields, type checks for expressions and > functions, etc. This will check if the query is a valid SQL query. > # Validation that query is supported: Checks if Flink can execute the given > query. Some syntactically and semantically valid SQL queries are not > supported, esp. in a streaming context. This requires running the optimizer. > If the optimizer generates an execution plan, the query can be executed. This > check includes the first step and is more expensive. > The reason for this separation is that the first check can be done much fast > as it does not involve calling the optimizer. Hence, it would be suitable for > fast checks in an interactive query editor. The second check might take more > time (depending on the complexity of the query) and might not be suitable for > rapid checks but only on explicit user request. > Requirements: > * validation does not modify the state of the {{TableEnvironment}}, i.e. it > does not add plan operators > * validation does not require connector dependencies > * validation can identify the update mode of a continuous query result > (append-only, upsert, retraction). > Out of scope for this issue: > * better error messages for unsupported features as suggested by FLINK-7217 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15635) Allow passing a ClassLoader to EnvironmentSettings
[ https://issues.apache.org/jira/browse/FLINK-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018069#comment-17018069 ] Jeff Zhang commented on FLINK-15635: [~twalthr] Would this affect the ClassLoader in TableFactoryService ? I also hit classloader issue in Zeppelin. Although I have work around, it would be nice to allow user to specify classloader explicitly. > Allow passing a ClassLoader to EnvironmentSettings > -- > > Key: FLINK-15635 > URL: https://issues.apache.org/jira/browse/FLINK-15635 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Planner >Reporter: Timo Walther >Priority: Major > > We had a couple of class loading issues in the past because people forgot to > use the right classloader in {{flink-table}}. The SQL Client executor code > hacks a classloader into the planner process by using {{wrapClassLoader}} > that sets the threads context classloader. > Instead we should allow passing a class loader to environment settings. This > class loader can be passed to the planner and can be stored in table > environment, table config, etc. to have a consistent class loading behavior. > Having this in place should replace the need for > {{Thread.currentThread().getContextClassLoader()}} in the entire > {{flink-table}} module. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15592) Streaming sql throw hive related sql when it doesn't use any hive table
Jeff Zhang created FLINK-15592: -- Summary: Streaming sql throw hive related sql when it doesn't use any hive table Key: FLINK-15592 URL: https://issues.apache.org/jira/browse/FLINK-15592 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: Jeff Zhang I use the following streaming sql to query a kafka table whose metadata is store in hive metastore via HiveCatalog. But it will throw hive related exception which is very confusing. SQL {code} SELECT * FROM ( SELECT *, ROW_NUMBER() OVER( ORDER BY event_ts) AS rownum FROM source_kafka) WHERE rownum <= 10 {code} Exception {code} Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. java.lang.reflect.InvocationTargetException at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:130) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:103) ... 13 more Caused by: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException at org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77) at org.apache.flink.table.planner.functions.utils.HiveAggSqlFunction.lambda$createReturnTypeInference$0(HiveAggSqlFunction.java:82) at org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470) at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.SqlCallBinding.getOperandType(SqlCallBinding.java:237) at org.apache.calcite.sql.type.OrdinalReturnTypeInference.inferReturnType(OrdinalReturnTypeInference.java:40) at org.apache.calcite.sql.type.SqlTypeTransformCascade.inferReturnType(SqlTypeTransformCascade.java:54) at org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470) at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437) at org.apache.calcite.sql.SqlOverOperator.deriveType(SqlOverOperator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:479) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4105) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3389) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at
[jira] [Updated] (FLINK-15592) Streaming sql throw hive exception when it doesn't use any hive table
[ https://issues.apache.org/jira/browse/FLINK-15592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-15592: --- Summary: Streaming sql throw hive exception when it doesn't use any hive table (was: Streaming sql throw hive related sql when it doesn't use any hive table) > Streaming sql throw hive exception when it doesn't use any hive table > - > > Key: FLINK-15592 > URL: https://issues.apache.org/jira/browse/FLINK-15592 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > I use the following streaming sql to query a kafka table whose metadata is > store in hive metastore via HiveCatalog. But it will throw hive related > exception which is very confusing. > SQL > {code} > SELECT * > FROM ( >SELECT *, > ROW_NUMBER() OVER( >ORDER BY event_ts) AS rownum >FROM source_kafka) > WHERE rownum <= 10 > {code} > Exception > {code} > Caused by: org.apache.flink.table.api.ValidationException: SQL validation > failed. java.lang.reflect.InvocationTargetException > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:130) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) > at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:103) > ... 13 more > Caused by: java.lang.RuntimeException: > java.lang.reflect.InvocationTargetException > at > org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77) > at > org.apache.flink.table.planner.functions.utils.HiveAggSqlFunction.lambda$createReturnTypeInference$0(HiveAggSqlFunction.java:82) > at > org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470) > at > org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) > at > org.apache.calcite.sql.SqlCallBinding.getOperandType(SqlCallBinding.java:237) > at > org.apache.calcite.sql.type.OrdinalReturnTypeInference.inferReturnType(OrdinalReturnTypeInference.java:40) > at > org.apache.calcite.sql.type.SqlTypeTransformCascade.inferReturnType(SqlTypeTransformCascade.java:54) > at > org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470) > at > org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437) > at > org.apache.calcite.sql.SqlOverOperator.deriveType(SqlOverOperator.java:86) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) > at > org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:479) > at >
[jira] [Commented] (FLINK-15566) Flink implicitly order the fields in PojoTypeInfo
[ https://issues.apache.org/jira/browse/FLINK-15566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014340#comment-17014340 ] Jeff Zhang commented on FLINK-15566: [~twalthr] When will FLIP-65 be completed ? > Flink implicitly order the fields in PojoTypeInfo > - > > Key: FLINK-15566 > URL: https://issues.apache.org/jira/browse/FLINK-15566 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > Attachments: image-2020-01-13-16-02-57-949.png > > > I don't know why flink would do that, but this cause my user defined function > behavior incorrectly if I and pojo in my udf and override getResultType > [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85] > > Here's the udf I define. > > {code:java} > %flink > import org.apache.flink.api.java.typeutils.RowTypeInfo > import org.apache.flink.api.common.typeinfo.Types > import org.apache.flink.api.java.typeutils._ > import org.apache.flink.api.scala.typeutils._ > import org.apache.flink.api.scala._ > class Person(val age:Int, val job: String, val marital: String, val > education: String, val default: String, val balance: String, val housing: > String, val loan: String, val contact: String, val day: String, val month: > String, val duration: Int, val campaign: Int, val pdays: Int, val previous: > Int, val poutcome: String, val y: String) > class ParseFunction extends TableFunction[Person] { > def eval(line: String) { > val tokens = line.split(";") > // parse the line > if (!line.startsWith("\"age\"")) { > collect(new Person(new Integer(tokens(0).toInt), normalize(tokens(1)), > normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), > normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), > normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new > Integer(tokens(11).toInt), new Integer(tokens(12).toInt), >new Integer(tokens(13).toInt), new Integer(tokens(14).toInt), > normalize(tokens(15)), normalize(tokens(16 > } > } > > override def getResultType() = { > val cls = classOf[Person] > new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList( >new PojoField(cls.getDeclaredField("age"), Types.INT), >new PojoField(cls.getDeclaredField("job"), Types.STRING), >new PojoField(cls.getDeclaredField("marital"), Types.STRING), >new PojoField(cls.getDeclaredField("education"), Types.STRING), >new PojoField(cls.getDeclaredField("default"), Types.STRING), >new PojoField(cls.getDeclaredField("balance"), Types.STRING), >new PojoField(cls.getDeclaredField("housing"), Types.STRING), >new PojoField(cls.getDeclaredField("loan"), Types.STRING), >new PojoField(cls.getDeclaredField("contact"), Types.STRING), >new PojoField(cls.getDeclaredField("day"), Types.STRING), >new PojoField(cls.getDeclaredField("month"), Types.STRING), >new PojoField(cls.getDeclaredField("duration"), Types.INT), >new PojoField(cls.getDeclaredField("campaign"), Types.INT), >new PojoField(cls.getDeclaredField("pdays"), Types.INT), >new PojoField(cls.getDeclaredField("previous"), Types.INT), >new PojoField(cls.getDeclaredField("poutcome"), Types.STRING), >new PojoField(cls.getDeclaredField("y"), Types.STRING) > )) > } > // remove the quote > private def normalize(token: String) = { > if (token.startsWith("\"")) { > token.substring(1, token.length - 1) > } else { > token > } > } > }{code} > And then I use this udf in sql but get the wrong result because flink reorder > the fields implicitly. > !image-2020-01-13-16-02-57-949.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15569) Incorrect sample code in udf document
Jeff Zhang created FLINK-15569: -- Summary: Incorrect sample code in udf document Key: FLINK-15569 URL: https://issues.apache.org/jira/browse/FLINK-15569 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.10.0 Reporter: Jeff Zhang Attachments: image-2020-01-13-16-59-00-022.png Should use JTuple2 instead of JTuple1 !image-2020-01-13-16-59-00-022.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15566) Flink implicitly order the fields in PojoTypeInfo
[ https://issues.apache.org/jira/browse/FLINK-15566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-15566: --- Description: I don't know why flink would do that, but this cause my user defined function behavior incorrectly if I and pojo in my udf and override getResultType [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85] Here's the udf I define. {code:java} %flink import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils._ import org.apache.flink.api.scala._ class Person(val age:Int, val job: String, val marital: String, val education: String, val default: String, val balance: String, val housing: String, val loan: String, val contact: String, val day: String, val month: String, val duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val poutcome: String, val y: String) class ParseFunction extends TableFunction[Person] { def eval(line: String) { val tokens = line.split(";") // parse the line if (!line.startsWith("\"age\"")) { collect(new Person(new Integer(tokens(0).toInt), normalize(tokens(1)), normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new Integer(tokens(11).toInt), new Integer(tokens(12).toInt), new Integer(tokens(13).toInt), new Integer(tokens(14).toInt), normalize(tokens(15)), normalize(tokens(16 } } override def getResultType() = { val cls = classOf[Person] new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList( new PojoField(cls.getDeclaredField("age"), Types.INT), new PojoField(cls.getDeclaredField("job"), Types.STRING), new PojoField(cls.getDeclaredField("marital"), Types.STRING), new PojoField(cls.getDeclaredField("education"), Types.STRING), new PojoField(cls.getDeclaredField("default"), Types.STRING), new PojoField(cls.getDeclaredField("balance"), Types.STRING), new PojoField(cls.getDeclaredField("housing"), Types.STRING), new PojoField(cls.getDeclaredField("loan"), Types.STRING), new PojoField(cls.getDeclaredField("contact"), Types.STRING), new PojoField(cls.getDeclaredField("day"), Types.STRING), new PojoField(cls.getDeclaredField("month"), Types.STRING), new PojoField(cls.getDeclaredField("duration"), Types.INT), new PojoField(cls.getDeclaredField("campaign"), Types.INT), new PojoField(cls.getDeclaredField("pdays"), Types.INT), new PojoField(cls.getDeclaredField("previous"), Types.INT), new PojoField(cls.getDeclaredField("poutcome"), Types.STRING), new PojoField(cls.getDeclaredField("y"), Types.STRING) )) } // remove the quote private def normalize(token: String) = { if (token.startsWith("\"")) { token.substring(1, token.length - 1) } else { token } } }{code} And then I use this udf in sql but get the wrong result because flink reorder the fields implicitly. !image-2020-01-13-16-02-57-949.png! was: I don't know why flink would do that, but this cause my user defined function behavior incorrectly if I and pojo in my udf and override getResultType [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85] Here's the udf I define. {code:java} %flink import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils._ import org.apache.flink.api.scala._ class Person(val age:Int, val job: String, val marital: String, val education: String, val default: String, val balance: String, val housing: String, val loan: String, val contact: String, val day: String, val month: String, val duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val poutcome: String, val y: String) class ParseFunction extends TableFunction[Person] { def eval(line: String) { val tokens = line.split(";") // parse the line if (!line.startsWith("\"age\"")) { collect(new Person(new Integer(tokens(0).toInt), normalize(tokens(1)), normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new Integer(tokens(11).toInt), new Integer(tokens(12).toInt), new Integer(tokens(13).toInt), new Integer(tokens(14).toInt), normalize(tokens(15)), normalize(tokens(16 } } override def
[jira] [Updated] (FLINK-15566) Flink implicitly order the fields in PojoTypeInfo
[ https://issues.apache.org/jira/browse/FLINK-15566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-15566: --- Description: I don't know why flink would do that, but this cause my user defined function behavior incorrectly if I and pojo in my udf and override getResultType [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85] Here's the udf I define. {code:java} %flink import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils._ import org.apache.flink.api.scala._ class Person(val age:Int, val job: String, val marital: String, val education: String, val default: String, val balance: String, val housing: String, val loan: String, val contact: String, val day: String, val month: String, val duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val poutcome: String, val y: String) class ParseFunction extends TableFunction[Person] { def eval(line: String) { val tokens = line.split(";") // parse the line if (!line.startsWith("\"age\"")) { collect(new Person(new Integer(tokens(0).toInt), normalize(tokens(1)), normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new Integer(tokens(11).toInt), new Integer(tokens(12).toInt), new Integer(tokens(13).toInt), new Integer(tokens(14).toInt), normalize(tokens(15)), normalize(tokens(16 } } override def getResultType() = { val cls = classOf[Person] new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList( new PojoField(cls.getDeclaredField("age"), Types.INT), new PojoField(cls.getDeclaredField("job"), Types.STRING), new PojoField(cls.getDeclaredField("marital"), Types.STRING), new PojoField(cls.getDeclaredField("education"), Types.STRING), new PojoField(cls.getDeclaredField("default"), Types.STRING), new PojoField(cls.getDeclaredField("balance"), Types.STRING), new PojoField(cls.getDeclaredField("housing"), Types.STRING), new PojoField(cls.getDeclaredField("loan"), Types.STRING), new PojoField(cls.getDeclaredField("contact"), Types.STRING), new PojoField(cls.getDeclaredField("day"), Types.STRING), new PojoField(cls.getDeclaredField("month"), Types.STRING), new PojoField(cls.getDeclaredField("duration"), Types.INT), new PojoField(cls.getDeclaredField("campaign"), Types.INT), new PojoField(cls.getDeclaredField("pdays"), Types.INT), new PojoField(cls.getDeclaredField("previous"), Types.INT), new PojoField(cls.getDeclaredField("poutcome"), Types.STRING), new PojoField(cls.getDeclaredField("y"), Types.STRING) )) } // remove the quote private def normalize(token: String) = { if (token.startsWith("\"")) { token.substring(1, token.length - 1) } else { token } } }{code} And then I use this udf in sql but get the wrong result because the flink reorder the fields implicitly. !image-2020-01-13-16-02-57-949.png! was: I don't know why flink would do that, but this cause my user defined function behavior incorrectly if I and pojo in my udf and override getResultType [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85] Here's the udf I define. {code:java} %flink import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils._ import org.apache.flink.api.scala._ class Person(val age:Int, val job: String, val marital: String, val education: String, val default: String, val balance: String, val housing: String, val loan: String, val contact: String, val day: String, val month: String, val duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val poutcome: String, val y: String) class ParseFunction extends TableFunction[Person] { def eval(line: String) { val tokens = line.split(";") // parse the line if (!line.startsWith("\"age\"")) { collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)), normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new Integer(tokens(11).toInt), new Integer(tokens(12).toInt), new Integer(tokens(13).toInt), new Integer(tokens(14).toInt), normalize(tokens(15)), normalize(tokens(16 } } override def
[jira] [Commented] (FLINK-15566) Flink implicitly order the fields in PojoTypeInfo
[ https://issues.apache.org/jira/browse/FLINK-15566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014100#comment-17014100 ] Jeff Zhang commented on FLINK-15566: [~aljoscha] Could you take a look at this issue ? Seems you are the original author of that piece of code. > Flink implicitly order the fields in PojoTypeInfo > - > > Key: FLINK-15566 > URL: https://issues.apache.org/jira/browse/FLINK-15566 > Project: Flink > Issue Type: Improvement > Components: API / Core >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > Attachments: image-2020-01-13-16-02-57-949.png > > > I don't know why flink would do that, but this cause my user defined function > behavior incorrectly if I and pojo in my udf and override getResultType > [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85] > > Here's the udf I define. > > {code:java} > %flink > import org.apache.flink.api.java.typeutils.RowTypeInfo > import org.apache.flink.api.common.typeinfo.Types > import org.apache.flink.api.java.typeutils._ > import org.apache.flink.api.scala.typeutils._ > import org.apache.flink.api.scala._ > class Person(val age:Int, val job: String, val marital: String, val > education: String, val default: String, val balance: String, val housing: > String, val loan: String, val contact: String, val day: String, val month: > String, val duration: Int, val campaign: Int, val pdays: Int, val previous: > Int, val poutcome: String, val y: String) > class ParseFunction extends TableFunction[Person] { > def eval(line: String) { > val tokens = line.split(";") > // parse the line > if (!line.startsWith("\"age\"")) { > collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)), > normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), > normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), > normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new > Integer(tokens(11).toInt), new Integer(tokens(12).toInt), >new Integer(tokens(13).toInt), new Integer(tokens(14).toInt), > normalize(tokens(15)), normalize(tokens(16 > } > } > > override def getResultType() = { > val cls = classOf[Person] > new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList( >new PojoField(cls.getDeclaredField("age"), Types.INT), >new PojoField(cls.getDeclaredField("job"), Types.STRING), >new PojoField(cls.getDeclaredField("marital"), Types.STRING), >new PojoField(cls.getDeclaredField("education"), Types.STRING), >new PojoField(cls.getDeclaredField("default"), Types.STRING), >new PojoField(cls.getDeclaredField("balance"), Types.STRING), >new PojoField(cls.getDeclaredField("housing"), Types.STRING), >new PojoField(cls.getDeclaredField("loan"), Types.STRING), >new PojoField(cls.getDeclaredField("contact"), Types.STRING), >new PojoField(cls.getDeclaredField("day"), Types.STRING), >new PojoField(cls.getDeclaredField("month"), Types.STRING), >new PojoField(cls.getDeclaredField("duration"), Types.INT), >new PojoField(cls.getDeclaredField("campaign"), Types.INT), >new PojoField(cls.getDeclaredField("pdays"), Types.INT), >new PojoField(cls.getDeclaredField("previous"), Types.INT), >new PojoField(cls.getDeclaredField("poutcome"), Types.STRING), >new PojoField(cls.getDeclaredField("y"), Types.STRING) > )) > } > // remove the quote > private def normalize(token: String) = { > if (token.startsWith("\"")) { > token.substring(1, token.length - 1) > } else { > token > } > } > }{code} > And then I use this udf in sql but get the wrong result because the flink > reorder the fields implicitly. > !image-2020-01-13-16-02-57-949.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15566) Flink implicitly order the fields in PojoTypeInfo
Jeff Zhang created FLINK-15566: -- Summary: Flink implicitly order the fields in PojoTypeInfo Key: FLINK-15566 URL: https://issues.apache.org/jira/browse/FLINK-15566 Project: Flink Issue Type: Improvement Components: API / Core Affects Versions: 1.10.0 Reporter: Jeff Zhang Attachments: image-2020-01-13-16-02-57-949.png I don't know why flink would do that, but this cause my user defined function behavior incorrectly if I and pojo in my udf and override getResultType [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85] Here's the udf I define. {code:java} %flink import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils._ import org.apache.flink.api.scala._ class Person(val age:Int, val job: String, val marital: String, val education: String, val default: String, val balance: String, val housing: String, val loan: String, val contact: String, val day: String, val month: String, val duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val poutcome: String, val y: String) class ParseFunction extends TableFunction[Person] { def eval(line: String) { val tokens = line.split(";") // parse the line if (!line.startsWith("\"age\"")) { collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)), normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new Integer(tokens(11).toInt), new Integer(tokens(12).toInt), new Integer(tokens(13).toInt), new Integer(tokens(14).toInt), normalize(tokens(15)), normalize(tokens(16 } } override def getResultType() = { val cls = classOf[Person] new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList( new PojoField(cls.getDeclaredField("age"), Types.INT), new PojoField(cls.getDeclaredField("job"), Types.STRING), new PojoField(cls.getDeclaredField("marital"), Types.STRING), new PojoField(cls.getDeclaredField("education"), Types.STRING), new PojoField(cls.getDeclaredField("default"), Types.STRING), new PojoField(cls.getDeclaredField("balance"), Types.STRING), new PojoField(cls.getDeclaredField("housing"), Types.STRING), new PojoField(cls.getDeclaredField("loan"), Types.STRING), new PojoField(cls.getDeclaredField("contact"), Types.STRING), new PojoField(cls.getDeclaredField("day"), Types.STRING), new PojoField(cls.getDeclaredField("month"), Types.STRING), new PojoField(cls.getDeclaredField("duration"), Types.INT), new PojoField(cls.getDeclaredField("campaign"), Types.INT), new PojoField(cls.getDeclaredField("pdays"), Types.INT), new PojoField(cls.getDeclaredField("previous"), Types.INT), new PojoField(cls.getDeclaredField("poutcome"), Types.STRING), new PojoField(cls.getDeclaredField("y"), Types.STRING) )) } // remove the quote private def normalize(token: String) = { if (token.startsWith("\"")) { token.substring(1, token.length - 1) } else { token } } }{code} And then I use this udf in sql but get the wrong result because the flink reorder the fields implicitly. !image-2020-01-13-16-02-57-949.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15562) Unable to create walk through project
[ https://issues.apache.org/jira/browse/FLINK-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-15562: --- Component/s: Documentation > Unable to create walk through project > - > > Key: FLINK-15562 > URL: https://issues.apache.org/jira/browse/FLINK-15562 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > I try to follow the instruction here to create flink walk though project, but > hit the following errors. > [https://ci.apache.org/projects/flink/flink-docs-master/getting-started/walkthroughs/table_api.html] > {code:java} > ERROR] Failed to execute goal > org.apache.maven.plugins:maven-archetype-plugin:3.0.1:generate (default-cli) > on project standalone-pom: archetypeCatalog > 'https://repository.apache.org/content/repositories/snapshots/' is not > supported anymore. Please read the plugin documentation for details. -> [Help > 1] > [ERROR] > [ERROR] To see the full stack trace of the errors, re-run Maven with the -e > switch. > [ERROR] Re-run Maven using the -X switch to enable full debug logging. > [ERROR] > [ERROR] For more information about the errors and possible solutions, please > read the following articles: > [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15562) Unable to create walk through project
Jeff Zhang created FLINK-15562: -- Summary: Unable to create walk through project Key: FLINK-15562 URL: https://issues.apache.org/jira/browse/FLINK-15562 Project: Flink Issue Type: Bug Affects Versions: 1.10.0 Reporter: Jeff Zhang I try to follow the instruction here to create flink walk though project, but hit the following errors. [https://ci.apache.org/projects/flink/flink-docs-master/getting-started/walkthroughs/table_api.html] {code:java} ERROR] Failed to execute goal org.apache.maven.plugins:maven-archetype-plugin:3.0.1:generate (default-cli) on project standalone-pom: archetypeCatalog 'https://repository.apache.org/content/repositories/snapshots/' is not supported anymore. Please read the plugin documentation for details. -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15384) Expose JobListener API on TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-15384: --- Summary: Expose JobListener API on TableEnvironment (was: Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment ) > Expose JobListener API on TableEnvironment > -- > > Key: FLINK-15384 > URL: https://issues.apache.org/jira/browse/FLINK-15384 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > In the new approach of creating TableEnvironement, it is not possible to pass > ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This > cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment > is not available in table api. Such as add JobListener, execute Job async. So > I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > {code} > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15439) Incorrect SQL Document about DDL support
Jeff Zhang created FLINK-15439: -- Summary: Incorrect SQL Document about DDL support Key: FLINK-15439 URL: https://issues.apache.org/jira/browse/FLINK-15439 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.10.0 Reporter: Jeff Zhang Attachments: image-2019-12-31-09-45-02-044.png !image-2019-12-31-09-45-02-044.png! DDL is supported now, document should be updated. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15395) No API for execute insert into statement
[ https://issues.apache.org/jira/browse/FLINK-15395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003500#comment-17003500 ] Jeff Zhang commented on FLINK-15395: [~ykt836] Here's how sql-client execute `insert into`. This is too complex. (Besides calling sqlUpdate, I also need to other complex stuff) https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572 > No API for execute insert into statement > - > > Key: FLINK-15395 > URL: https://issues.apache.org/jira/browse/FLINK-15395 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > IIUC, TableEnv#sqlUpdate is used for DDL & DML while TableEnv#sqlQuery is > used for select statement. Unfortunately, it seems `insert into` is a special > case that no simple api can be used for it. > The code of implementing `insert into` in sql-client is pretty complex, it > would be nice to have one simple api for executing `insert into` > https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15395) No API for execute insert into statement
[ https://issues.apache.org/jira/browse/FLINK-15395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003497#comment-17003497 ] Jeff Zhang commented on FLINK-15395: Thanks [~jark] [~ykt836], `sqlUpdate` is the best option, unfortunately currently it could not be done in this way. > No API for execute insert into statement > - > > Key: FLINK-15395 > URL: https://issues.apache.org/jira/browse/FLINK-15395 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > IIUC, TableEnv#sqlUpdate is used for DDL & DML while TableEnv#sqlQuery is > used for select statement. Unfortunately, it seems `insert into` is a special > case that no simple api can be used for it. > The code of implementing `insert into` in sql-client is pretty complex, it > would be nice to have one simple api for executing `insert into` > https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15395) No API for execute insert into statement
[ https://issues.apache.org/jira/browse/FLINK-15395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-15395: --- Description: IIUC, TableEnv#sqlUpdate is used for DDL & DML while TableEnv#sqlQuery is used for select statement. Unfortunately, it seems `insert into` is a special case that no simple api can be used for it. The code of implementing `insert into` in sql-client is pretty complex, it would be nice to have one simple api for executing `insert into` https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572 was: IIUC, TableEnv#sqlUpdate is used for DDL while TableEnv#sqlQuery is used for DML. Unfortunately, it seems `insert into` is a special case that no simple api can be used for it. The code of implementing `insert into` in sql-client is pretty complex, it would be nice to have one simple api for executing `insert into` https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572 > No API for execute insert into statement > - > > Key: FLINK-15395 > URL: https://issues.apache.org/jira/browse/FLINK-15395 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > IIUC, TableEnv#sqlUpdate is used for DDL & DML while TableEnv#sqlQuery is > used for select statement. Unfortunately, it seems `insert into` is a special > case that no simple api can be used for it. > The code of implementing `insert into` in sql-client is pretty complex, it > would be nice to have one simple api for executing `insert into` > https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15395) No API for execute insert into statement
[ https://issues.apache.org/jira/browse/FLINK-15395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-15395: --- Issue Type: Improvement (was: Bug) > No API for execute insert into statement > - > > Key: FLINK-15395 > URL: https://issues.apache.org/jira/browse/FLINK-15395 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > IIUC, TableEnv#sqlUpdate is used for DDL while TableEnv#sqlQuery is used for > DML. Unfortunately, it seems `insert into` is a special case that no simple > api can be used for it. > The code of implementing `insert into` in sql-client is pretty complex, it > would be nice to have one simple api for executing `insert into` > https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15395) No API for execute insert into statement
[ https://issues.apache.org/jira/browse/FLINK-15395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003474#comment-17003474 ] Jeff Zhang commented on FLINK-15395: \cc [~tiwalter] [~aljoscha] > No API for execute insert into statement > - > > Key: FLINK-15395 > URL: https://issues.apache.org/jira/browse/FLINK-15395 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > IIUC, TableEnv#sqlUpdate is used for DDL while TableEnv#sqlQuery is used for > DML. Unfortunately, it seems `insert into` is a special case that no simple > api can be used for it. > The code of implementing `insert into` in sql-client is pretty complex, it > would be nice to have one simple api for executing `insert into` > https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15395) No API for execute insert into statement
Jeff Zhang created FLINK-15395: -- Summary: No API for execute insert into statement Key: FLINK-15395 URL: https://issues.apache.org/jira/browse/FLINK-15395 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.10.0 Reporter: Jeff Zhang IIUC, TableEnv#sqlUpdate is used for DDL while TableEnv#sqlQuery is used for DML. Unfortunately, it seems `insert into` is a special case that no simple api can be used for it. The code of implementing `insert into` in sql-client is pretty complex, it would be nice to have one simple api for executing `insert into` https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003080#comment-17003080 ] Jeff Zhang edited comment on FLINK-15384 at 12/25/19 5:31 AM: -- [~ykt836] I can do it for StreamTableEnvironment, but there's no such api for BatchTableEnvironment. And I guess using TableEnvironment.create is the preferred way. BTW, the document of creating TableEnvironment is really very confusing. There're so many different approaches for creating different TableEnvironment (flink/blink planner, stream/batch) https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment was (Author: zjffdu): [~ykt836] I can do for StreamTableEnvironment, but there's no such api for BatchTableEnvironment. And I guess using TableEnvironment.create is the preferred way. BTW, the document of creating TableEnvironment is really very confusing. There're so many different approaches for creating different TableEnvironment (flink/blink planner, stream/batch) https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment > Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > -- > > Key: FLINK-15384 > URL: https://issues.apache.org/jira/browse/FLINK-15384 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > In the new approach of creating TableEnvironement, it is not possible to pass > ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This > cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment > is not available in table api. Such as add JobListener, execute Job async. So > I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > {code} > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003080#comment-17003080 ] Jeff Zhang commented on FLINK-15384: [~ykt836] I can do for StreamTableEnvironment, but there's no such api for BatchTableEnvironment. And I guess using TableEnvironment.create is the preferred way. BTW, the document of creating TableEnvironment is really very confusing. There're so many different approaches for creating different TableEnvironment (flink/blink planner, stream/batch) https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment > Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > -- > > Key: FLINK-15384 > URL: https://issues.apache.org/jira/browse/FLINK-15384 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > In the new approach of creating TableEnvironement, it is not possible to pass > ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This > cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment > is not available in table api. Such as add JobListener, execute Job async. So > I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > {code} > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17002847#comment-17002847 ] Jeff Zhang edited comment on FLINK-15384 at 12/24/19 2:01 PM: -- \cc [~aljoscha] [~kkloudas] [~tison] [~lzljs3620320] was (Author: zjffdu): \cc [~aljoscha] [~kkloudas] [~tison] > Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > -- > > Key: FLINK-15384 > URL: https://issues.apache.org/jira/browse/FLINK-15384 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > In the new approach of creating TableEnvironement, it is not possible to pass > ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This > cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment > is not available in table api. Such as add JobListener, execute Job async. So > I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > {code} > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17002847#comment-17002847 ] Jeff Zhang edited comment on FLINK-15384 at 12/24/19 2:02 PM: -- \cc [~aljoscha] [~kkloudas] [~tison] [~lzljs3620320] [~twalthr] was (Author: zjffdu): \cc [~aljoscha] [~kkloudas] [~tison] [~lzljs3620320] > Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > -- > > Key: FLINK-15384 > URL: https://issues.apache.org/jira/browse/FLINK-15384 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > In the new approach of creating TableEnvironement, it is not possible to pass > ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This > cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment > is not available in table api. Such as add JobListener, execute Job async. So > I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > {code} > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17002847#comment-17002847 ] Jeff Zhang commented on FLINK-15384: \cc [~aljoscha] [~kkloudas] [~tison] > Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > -- > > Key: FLINK-15384 > URL: https://issues.apache.org/jira/browse/FLINK-15384 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > In the new approach of creating TableEnvironement, it is not possible to pass > ExecutionEnvironment/StreamExecutionEnvironment. This makes the all the > features in ExecutionEnvironment/StreamExecutionEnvironment is not available > in table api. Such as add JobListener, execute Job async. So I suggest to > allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > {code} > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-15384: --- Description: In the new approach of creating TableEnvironement, it is not possible to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment is not available in table api. Such as add JobListener, execute Job async. So I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment {code} EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); {code} was: In the new approach of creating TableEnvironement, it is not possible to pass ExecutionEnvironment/StreamExecutionEnvironment. This makes the all the features in ExecutionEnvironment/StreamExecutionEnvironment is not available in table api. Such as add JobListener, execute Job async. So I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment {code} EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); {code} > Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > -- > > Key: FLINK-15384 > URL: https://issues.apache.org/jira/browse/FLINK-15384 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Jeff Zhang >Priority: Major > > In the new approach of creating TableEnvironement, it is not possible to pass > ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This > cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment > is not available in table api. Such as add JobListener, execute Job async. So > I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to > TableEnvironment > {code} > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment
Jeff Zhang created FLINK-15384: -- Summary: Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment Key: FLINK-15384 URL: https://issues.apache.org/jira/browse/FLINK-15384 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.10.0 Reporter: Jeff Zhang In the new approach of creating TableEnvironement, it is not possible to pass ExecutionEnvironment/StreamExecutionEnvironment. This makes the all the features in ExecutionEnvironment/StreamExecutionEnvironment is not available in table api. Such as add JobListener, execute Job async. So I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment {code} EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)