[jira] [Created] (FLINK-31667) Should not enforce attached mode in RemoteEnvironment & RemoteStreamEnvironment

2023-03-29 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-31667:
--

 Summary: Should not enforce attached mode in RemoteEnvironment & 
RemoteStreamEnvironment
 Key: FLINK-31667
 URL: https://issues.apache.org/jira/browse/FLINK-31667
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission
Reporter: Jeff Zhang


Currently, Flink enforce to use attached mode in `RemoteEnvironment` & 
`RemoteStreamEnvironment` even user try to use detached mode. This doesn't make 
sense to me.

* 
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java#L151

* 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java#L211

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-21607) Twitter table source connector

2021-03-04 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-21607:
--

 Summary: Twitter table source connector
 Key: FLINK-21607
 URL: https://issues.apache.org/jira/browse/FLINK-21607
 Project: Flink
  Issue Type: New Feature
Affects Versions: 1.12.2
Reporter: Jeff Zhang


It would be nice to have such flink twitter table source connector. This is 
especially useful to demo flink sql examples. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21607) Twitter table source connector

2021-03-04 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-21607:
---
Description: 
It would be nice to have such flink twitter table source connector. This is 
especially useful for demo flink sql examples. 

 

  was:
It would be nice to have such flink twitter table source connector. This is 
especially useful to demo flink sql examples. 

 


> Twitter table source connector
> --
>
> Key: FLINK-21607
> URL: https://issues.apache.org/jira/browse/FLINK-21607
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.12.2
>Reporter: Jeff Zhang
>Priority: Major
>
> It would be nice to have such flink twitter table source connector. This is 
> especially useful for demo flink sql examples. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17788) scala shell in yarn mode is broken

2020-06-10 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-17788:
---
Affects Version/s: 1.10.1

> scala shell in yarn mode is broken
> --
>
> Key: FLINK-17788
> URL: https://issues.apache.org/jira/browse/FLINK-17788
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.10.1, 1.11.0
>Reporter: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> When I start scala shell in yarn mode, one yarn app will be launched, and 
> after I write some flink code and trigger a flink job, another yarn app will 
> be launched but would failed to launch due to some conflicts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18167) Flink Job hangs there when one vertex is failed and another is cancelled.

2020-06-06 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-18167:
--

 Summary: Flink Job hangs there when one vertex is failed and 
another is cancelled. 
 Key: FLINK-18167
 URL: https://issues.apache.org/jira/browse/FLINK-18167
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.10.0
Reporter: Jeff Zhang
 Attachments: image-2020-06-06-15-39-35-441.png

After I call cancel with savepoint, the cancel operation is failed. The 
following is what I see in client side. 
{code:java}

WARN [2020-06-06 13:45:16,003] ({Thread-1241} JobManager.java[cancelJob]:137) - 
Fail to cancel job 7e5492f35c1a7f5dad7c805ba943ea52 that is associated with 
paragraph paragraph_1586733868269_783581378
java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator 
is suspending.
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.zeppelin.flink.JobManager.cancelJob(JobManager.java:129)
at 
org.apache.zeppelin.flink.FlinkScalaInterpreter.cancel(FlinkScalaInterpreter.scala:648)
at 
org.apache.zeppelin.flink.FlinkInterpreter.cancel(FlinkInterpreter.java:101)
at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.cancel(LazyOpenInterpreter.java:119)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.lambda$cancel$1(RemoteInterpreterServer.java:800)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator 
is suspending.
at 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$stopWithSavepoint$9(SchedulerBase.java:873)
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator 
is suspending.
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerSynchronousSavepoint$0(CheckpointCoordinator.java:428)
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$1(CheckpointCoordinator.java:457)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 

[jira] [Commented] (FLINK-17788) scala shell in yarn mode is broken

2020-06-01 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17120871#comment-17120871
 ] 

Jeff Zhang commented on FLINK-17788:


[~kkl0u] The second yarn app fail to launch due to port conflict, here's what I 
see. But anyway, scala shell should not launch another yarn app, instead it 
should use only one yarn session cluster.

{code}
2020-06-01 17:09:18,860 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
YarnJobClusterEntrypoint down with application status FAILED. Diagnostics 
org.apache.flink.util.FlinkException: Could not create the 
DispatcherResourceManagerComponent.
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:255)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:516)
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89)
Caused by: java.net.BindException: Could not start rest endpoint on any port in 
port range 57574
at 
org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:222)
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:163)
... 9 more
{code}

> scala shell in yarn mode is broken
> --
>
> Key: FLINK-17788
> URL: https://issues.apache.org/jira/browse/FLINK-17788
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.11.0
>Reporter: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> When I start scala shell in yarn mode, one yarn app will be launched, and 
> after I write some flink code and trigger a flink job, another yarn app will 
> be launched but would failed to launch due to some conflicts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17788) scala shell in yarn mode is broken

2020-05-27 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118249#comment-17118249
 ] 

Jeff Zhang edited comment on FLINK-17788 at 5/28/20, 3:09 AM:
--

[~kkl0u] Yes,each time when I call execute(), scala shell will create a new 
flink session cluster. But actually the session cluster will fail to launch, I 
havn't digger into it further, I suspect it  is due to some conflict with the 
previous flink session cluster.


was (Author: zjffdu):
[~kkl0u] Yes,each time when I call execute(), scala shell will create a new 
flink session cluster. But actually the session cluster will fail to launch, I 
havn't digger into it further, I suspect it  is due to some conflict with the 
preview flink session cluster.

> scala shell in yarn mode is broken
> --
>
> Key: FLINK-17788
> URL: https://issues.apache.org/jira/browse/FLINK-17788
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.11.0
>Reporter: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> When I start scala shell in yarn mode, one yarn app will be launched, and 
> after I write some flink code and trigger a flink job, another yarn app will 
> be launched but would failed to launch due to some conflicts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17944) Wrong output in sql client's table mode

2020-05-27 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-17944:
---
Issue Type: Bug  (was: Improvement)

> Wrong output in sql client's table mode
> ---
>
> Key: FLINK-17944
> URL: https://issues.apache.org/jira/browse/FLINK-17944
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.11.0
>Reporter: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> When I run the following sql example, I get the wrong output
> {code:java}
> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
> ('Bob')) AS NameTable(name) GROUP BY name; {code}
>  
> {code:java}
>   Bob 1
>  Alice 1
>   Greg 1
>Bob 2 {code}
> This is due to we add kind in Row, so the sematics of equals method changes



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17788) scala shell in yarn mode is broken

2020-05-27 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118249#comment-17118249
 ] 

Jeff Zhang edited comment on FLINK-17788 at 5/28/20, 2:45 AM:
--

[~kkl0u] Yes,each time when I call execute(), scala shell will create a new 
flink session cluster. But actually the session cluster will fail to launch, I 
havn't digger into it further, I suspect it  is due to some conflict with the 
preview flink session cluster.


was (Author: zjffdu):
Yes,each time when I call execute(), scala shell will create a new flink 
session cluster. But actually the session cluster will fail to launch, I havn't 
digger into it further, I suspect it  is due to some conflict with the preview 
flink session cluster.

> scala shell in yarn mode is broken
> --
>
> Key: FLINK-17788
> URL: https://issues.apache.org/jira/browse/FLINK-17788
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.11.0
>Reporter: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> When I start scala shell in yarn mode, one yarn app will be launched, and 
> after I write some flink code and trigger a flink job, another yarn app will 
> be launched but would failed to launch due to some conflicts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17788) scala shell in yarn mode is broken

2020-05-27 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118249#comment-17118249
 ] 

Jeff Zhang commented on FLINK-17788:


Yes,each time when I call execute(), scala shell will create a new flink 
session cluster. But actually the session cluster will fail to launch, I havn't 
digger into it further, I suspect it  is due to some conflict with the preview 
flink session cluster.

> scala shell in yarn mode is broken
> --
>
> Key: FLINK-17788
> URL: https://issues.apache.org/jira/browse/FLINK-17788
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.11.0
>Reporter: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> When I start scala shell in yarn mode, one yarn app will be launched, and 
> after I write some flink code and trigger a flink job, another yarn app will 
> be launched but would failed to launch due to some conflicts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17788) scala shell in yarn mode is broken

2020-05-26 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17116822#comment-17116822
 ] 

Jeff Zhang commented on FLINK-17788:


[~kkl0u] The phenomenon I see is that when I launch scala-shell in yarn mode, 
it would launch one yarn app for flink yarn session cluster. Then I type some 
flink code to trigger one flink, then it would launch another yarn app. The 
root cause is that the deployment option is set to yarn-per-job which is 
incorrect. Actually deployment option would be set 2 times which cause the 
wrong setting. 
* 
https://github.com/apache/flink/blob/master/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala#L248
* 
https://github.com/apache/flink/blob/master/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala#L287

This is not a clean fix, I think the whole flink scala shell module need to be 
refactoring to make the code clean. 



> scala shell in yarn mode is broken
> --
>
> Key: FLINK-17788
> URL: https://issues.apache.org/jira/browse/FLINK-17788
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.11.0
>Reporter: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> When I start scala shell in yarn mode, one yarn app will be launched, and 
> after I write some flink code and trigger a flink job, another yarn app will 
> be launched but would failed to launch due to some conflicts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17944) Wrong output in sql client's table mode

2020-05-26 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17116765#comment-17116765
 ] 

Jeff Zhang commented on FLINK-17944:


I will make a PR for it

> Wrong output in sql client's table mode
> ---
>
> Key: FLINK-17944
> URL: https://issues.apache.org/jira/browse/FLINK-17944
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.11.0
>Reporter: Jeff Zhang
>Priority: Major
>
> When I run the following sql example, I get the wrong output
> {code:java}
> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
> ('Bob')) AS NameTable(name) GROUP BY name; {code}
>  
> {code:java}
>   Bob 1
>  Alice 1
>   Greg 1
>Bob 2 {code}
> This is due to we add kind in Row, so the sematics of equals method changes



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17944) Wrong output in sql client's table mode

2020-05-26 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-17944:
--

 Summary: Wrong output in sql client's table mode
 Key: FLINK-17944
 URL: https://issues.apache.org/jira/browse/FLINK-17944
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Affects Versions: 1.11.0
Reporter: Jeff Zhang


When I run the following sql example, I get the wrong output
{code:java}

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name; {code}
 
{code:java}
  Bob 1
 Alice 1
  Greg 1
   Bob 2 {code}

This is due to we add kind in Row, so the sematics of equals method changes



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17889) flink-connector-hive jar contains wrong class in its SPI config file org.apache.flink.table.factories.TableFactory

2020-05-22 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17114191#comment-17114191
 ] 

Jeff Zhang commented on FLINK-17889:


\cc [~lirui] [~lzljs3620320]

> flink-connector-hive jar contains wrong class in its SPI config file 
> org.apache.flink.table.factories.TableFactory
> --
>
> Key: FLINK-17889
> URL: https://issues.apache.org/jira/browse/FLINK-17889
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.11.0
>Reporter: Jeff Zhang
>Priority: Major
>
> These 2 classes are in flink-connector-hive jar's SPI config file
> {code:java}
> org.apache.flink.orc.OrcFileSystemFormatFactory
> License.org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory {code}
> Due to this issue, I get the following exception in zeppelin side.
> {code:java}
> Caused by: java.util.ServiceConfigurationError: 
> org.apache.flink.table.factories.TableFactory: Provider 
> org.apache.flink.orc.OrcFileSystemFormatFactory not a subtypeCaused by: 
> java.util.ServiceConfigurationError: 
> org.apache.flink.table.factories.TableFactory: Provider 
> org.apache.flink.orc.OrcFileSystemFormatFactory not a subtype at 
> java.util.ServiceLoader.fail(ServiceLoader.java:239) at 
> java.util.ServiceLoader.access$300(ServiceLoader.java:185) at 
> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) at 
> java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at 
> java.util.ServiceLoader$1.next(ServiceLoader.java:480) at 
> java.util.Iterator.forEachRemaining(Iterator.java:116) at 
> org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
>  ... 35 more {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17889) flink-connector-hive jar contains wrong class in its SPI config file org.apache.flink.table.factories.TableFactory

2020-05-22 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-17889:
---
Description: 
These 2 classes are in flink-connector-hive jar's SPI config file
{code:java}
org.apache.flink.orc.OrcFileSystemFormatFactory
License.org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory {code}
Due to this issue, I get the following exception in zeppelin side.
{code:java}
Caused by: java.util.ServiceConfigurationError: 
org.apache.flink.table.factories.TableFactory: Provider 
org.apache.flink.orc.OrcFileSystemFormatFactory not a subtypeCaused by: 
java.util.ServiceConfigurationError: 
org.apache.flink.table.factories.TableFactory: Provider 
org.apache.flink.orc.OrcFileSystemFormatFactory not a subtype at 
java.util.ServiceLoader.fail(ServiceLoader.java:239) at 
java.util.ServiceLoader.access$300(ServiceLoader.java:185) at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) at 
java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at 
java.util.ServiceLoader$1.next(ServiceLoader.java:480) at 
java.util.Iterator.forEachRemaining(Iterator.java:116) at 
org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
 ... 35 more {code}

  was:
These 2 classes are in flink-connector-hive jar's SPI config file
{code:java}
org.apache.flink.orc.OrcFileSystemFormatFactory
License.org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory {code}
Due to this issue, I get the following exception in zeppelin side.
{code:java}

 {code}


> flink-connector-hive jar contains wrong class in its SPI config file 
> org.apache.flink.table.factories.TableFactory
> --
>
> Key: FLINK-17889
> URL: https://issues.apache.org/jira/browse/FLINK-17889
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.11.0
>Reporter: Jeff Zhang
>Priority: Major
>
> These 2 classes are in flink-connector-hive jar's SPI config file
> {code:java}
> org.apache.flink.orc.OrcFileSystemFormatFactory
> License.org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory {code}
> Due to this issue, I get the following exception in zeppelin side.
> {code:java}
> Caused by: java.util.ServiceConfigurationError: 
> org.apache.flink.table.factories.TableFactory: Provider 
> org.apache.flink.orc.OrcFileSystemFormatFactory not a subtypeCaused by: 
> java.util.ServiceConfigurationError: 
> org.apache.flink.table.factories.TableFactory: Provider 
> org.apache.flink.orc.OrcFileSystemFormatFactory not a subtype at 
> java.util.ServiceLoader.fail(ServiceLoader.java:239) at 
> java.util.ServiceLoader.access$300(ServiceLoader.java:185) at 
> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) at 
> java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at 
> java.util.ServiceLoader$1.next(ServiceLoader.java:480) at 
> java.util.Iterator.forEachRemaining(Iterator.java:116) at 
> org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
>  ... 35 more {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17889) flink-connector-hive jar contains wrong class in its SPI config file org.apache.flink.table.factories.TableFactory

2020-05-22 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-17889:
--

 Summary: flink-connector-hive jar contains wrong class in its SPI 
config file org.apache.flink.table.factories.TableFactory
 Key: FLINK-17889
 URL: https://issues.apache.org/jira/browse/FLINK-17889
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.11.0
Reporter: Jeff Zhang


These 2 classes are in flink-connector-hive jar's SPI config file
{code:java}
org.apache.flink.orc.OrcFileSystemFormatFactory
License.org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory {code}
Due to this issue, I get the following exception in zeppelin side.
{code:java}

 {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17870) dependent jars are missing to be shipped to cluster in scala shell

2020-05-21 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-17870:
---
Summary: dependent jars are missing to be shipped to cluster in scala shell 
 (was: scala shell jars are missing to be shipped to cluster)

> dependent jars are missing to be shipped to cluster in scala shell
> --
>
> Key: FLINK-17870
> URL: https://issues.apache.org/jira/browse/FLINK-17870
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.11.0
>Reporter: Jeff Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17870) scala shell jars are missing to be shipped to cluster

2020-05-21 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-17870:
--

 Summary: scala shell jars are missing to be shipped to cluster
 Key: FLINK-17870
 URL: https://issues.apache.org/jira/browse/FLINK-17870
 Project: Flink
  Issue Type: Bug
  Components: Scala Shell
Affects Versions: 1.11.0
Reporter: Jeff Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17788) scala shell in yarn mode is broken

2020-05-18 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-17788:
--

 Summary: scala shell in yarn mode is broken
 Key: FLINK-17788
 URL: https://issues.apache.org/jira/browse/FLINK-17788
 Project: Flink
  Issue Type: Bug
  Components: Scala Shell
Affects Versions: 1.11.0
Reporter: Jeff Zhang


When I start scala shell in yarn mode, one yarn app will be launched, and after 
I write some flink code and trigger a flink job, another yarn app will be 
launched but would failed to launch due to some conflicts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17763) No log files when starting scala-shell

2020-05-16 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-17763:
--

 Summary: No log files when starting scala-shell
 Key: FLINK-17763
 URL: https://issues.apache.org/jira/browse/FLINK-17763
 Project: Flink
  Issue Type: Bug
  Components: Scala Shell
Affects Versions: 1.11.0
Reporter: Jeff Zhang


I see the following error when starting scala shell.

 
{code:java}
Starting Flink Shell:
ERROR StatusLogger No Log4j 2 configuration file found. Using default 
configuration (logging only errors to the console), or user programmatically 
provided configurations. Set system property 'log4j2.debug' to show Log4j 2 
internal initialization logging. See 
https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions 
on how to configure Log4j 2 {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17588) Throw exception when hadoop jar is missing in flink scala's yarn mode.

2020-05-08 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-17588:
--

 Summary: Throw exception when hadoop jar is missing in flink 
scala's yarn mode.
 Key: FLINK-17588
 URL: https://issues.apache.org/jira/browse/FLINK-17588
 Project: Flink
  Issue Type: Improvement
  Components: Scala Shell
Affects Versions: 1.10.0
Reporter: Jeff Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-10911) Enable flink-scala-shell with Scala 2.12

2020-04-25 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-10911:
---
Summary: Enable flink-scala-shell with Scala 2.12  (was: Flink's 
flink-scala-shell is not working with Scala 2.12)

> Enable flink-scala-shell with Scala 2.12
> 
>
> Key: FLINK-10911
> URL: https://issues.apache.org/jira/browse/FLINK-10911
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> Flink's {{flink-scala-shell}} module is not working with Scala 2.12. 
> Therefore, it is currently excluded from the Scala 2.12 builds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10911) Flink's flink-scala-shell is not working with Scala 2.12

2020-04-25 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092156#comment-17092156
 ] 

Jeff Zhang commented on FLINK-10911:


[~trohrmann] We still need to enable scala-2.12 for flink scala shell, I will 
refine my PR to enable that. 

> Flink's flink-scala-shell is not working with Scala 2.12
> 
>
> Key: FLINK-10911
> URL: https://issues.apache.org/jira/browse/FLINK-10911
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> Flink's {{flink-scala-shell}} module is not working with Scala 2.12. 
> Therefore, it is currently excluded from the Scala 2.12 builds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10911) Flink's flink-scala-shell is not working with Scala 2.12

2020-04-24 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17091594#comment-17091594
 ] 

Jeff Zhang commented on FLINK-10911:


[~aljoscha] [~trohrmann] [~chesnay]  I tested it in my local machine. flink 
scala shell in scala 2.12 works for me. Besides there's one PR where I enable 
flink scala shell for profile scala-2.12. The build is passed. 
[https://github.com/apache/flink/pull/11895]

Do you remember what kind of issue you hit in flink scala shell for scala-2.12 ?

 

> Flink's flink-scala-shell is not working with Scala 2.12
> 
>
> Key: FLINK-10911
> URL: https://issues.apache.org/jira/browse/FLINK-10911
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> Flink's {{flink-scala-shell}} module is not working with Scala 2.12. 
> Therefore, it is currently excluded from the Scala 2.12 builds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10911) Flink's flink-scala-shell is not working with Scala 2.12

2020-04-23 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090401#comment-17090401
 ] 

Jeff Zhang commented on FLINK-10911:


I will take a look at this issue

> Flink's flink-scala-shell is not working with Scala 2.12
> 
>
> Key: FLINK-10911
> URL: https://issues.apache.org/jira/browse/FLINK-10911
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
>
> Flink's {{flink-scala-shell}} module is not working with Scala 2.12. 
> Therefore, it is currently excluded from the Scala 2.12 builds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17225) Support native k8s for scala shell

2020-04-17 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085889#comment-17085889
 ] 

Jeff Zhang edited comment on FLINK-17225 at 4/17/20, 3:59 PM:
--

Thanks [~fly_in_gis] 

[~chesnay] Why does scala 2.12 support block this feature ? Does flink on k8s 
depend on scala 2.12 ? 


was (Author: zjffdu):
Thanks [~fly_in_gis] 

[~chesnay] Why does scala 2.12 support block this feature ? Does flink on k8s 
depends on scala 2.12 ? 

> Support native k8s for scala shell
> --
>
> Key: FLINK-17225
> URL: https://issues.apache.org/jira/browse/FLINK-17225
> Project: Flink
>  Issue Type: New Feature
>  Components: Scala Shell
>Reporter: Yang Wang
>Priority: Major
>
> Currently, the Flink scala shell could create a new or retrieve an existing 
> YARN session cluster automatically. It is very convenient for the users.
> Then it will be great we could also support the K8s deployment. Benefit from 
> native K8s integration, it is not very difficult for the implementation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17225) Support native k8s for scala shell

2020-04-17 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085889#comment-17085889
 ] 

Jeff Zhang commented on FLINK-17225:


Thanks [~fly_in_gis] 

[~chesnay] Why does scala 2.12 support block this feature ? Does flink on k8s 
depends on scala 2.12 ? 

> Support native k8s for scala shell
> --
>
> Key: FLINK-17225
> URL: https://issues.apache.org/jira/browse/FLINK-17225
> Project: Flink
>  Issue Type: New Feature
>  Components: Scala Shell
>Reporter: Yang Wang
>Priority: Major
>
> Currently, the Flink scala shell could create a new or retrieve an existing 
> YARN session cluster automatically. It is very convenient for the users.
> Then it will be great we could also support the K8s deployment. Benefit from 
> native K8s integration, it is not very difficult for the implementation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16936) TablEnv creation and planner execution must be in the same thread

2020-04-16 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085046#comment-17085046
 ] 

Jeff Zhang commented on FLINK-16936:


Good, thanks for let me know FLIP-84 will make sql execution return after 
submission. 

> TablEnv creation and planner execution must be in the same thread 
> --
>
> Key: FLINK-16936
> URL: https://issues.apache.org/jira/browse/FLINK-16936
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> I hit this issue in zeppelin. Let me first describe the thread mode of 
> zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where 
> tableenv created, python thread is the python process thread, 
> python-javagateway-thread is the thread handling request from python 
> thread(same as pyflink).
> Now if I use following table api, I will get the following exception. 
> {code:java}
> st_env.from_path("cdn_access_log")\
>.select("uuid, "
>"ip_to_province(client_ip) as province, " 
>"response_size, request_time")\
>.group_by("province")\
>.select( 
>"province, count(uuid) as access_count, " 
>"sum(response_size) as total_download,  " 
>"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
>.insert_into("cdn_access_statistic") {code}
> Errors I get
> {code:java}
> Py4JJavaError: An error occurred while calling o60.insertInto.
> : java.lang.RuntimeException: Error while applying rule 
> FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args 
> [rel#107:LogicalAggregate.NONE.any.None: 
> 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
>   at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>   at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
>   at 
> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at py4j.Gateway.invoke(Gateway.java:282)
>   at 

[jira] [Commented] (FLINK-16936) TablEnv creation and planner execution must be in the same thread

2020-04-16 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084996#comment-17084996
 ] 

Jeff Zhang commented on FLINK-16936:


[~ykt836] Is multiple-thread support not on the roadmap ? IMO, this is 
necessary for batch where user want to submit multiple sql simultaneously, 
otherwise he has to wait to submit another sql until the last sql is finished. 

> TablEnv creation and planner execution must be in the same thread 
> --
>
> Key: FLINK-16936
> URL: https://issues.apache.org/jira/browse/FLINK-16936
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> I hit this issue in zeppelin. Let me first describe the thread mode of 
> zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where 
> tableenv created, python thread is the python process thread, 
> python-javagateway-thread is the thread handling request from python 
> thread(same as pyflink).
> Now if I use following table api, I will get the following exception. 
> {code:java}
> st_env.from_path("cdn_access_log")\
>.select("uuid, "
>"ip_to_province(client_ip) as province, " 
>"response_size, request_time")\
>.group_by("province")\
>.select( 
>"province, count(uuid) as access_count, " 
>"sum(response_size) as total_download,  " 
>"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
>.insert_into("cdn_access_statistic") {code}
> Errors I get
> {code:java}
> Py4JJavaError: An error occurred while calling o60.insertInto.
> : java.lang.RuntimeException: Error while applying rule 
> FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args 
> [rel#107:LogicalAggregate.NONE.any.None: 
> 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
>   at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>   at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
>   at 
> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 

[jira] [Updated] (FLINK-13811) Support converting flink table to pandas dataframe

2020-04-14 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-13811:
---
Description: Pandas dataframe is the de facto standard tableau data format 
of python community. It would be nice to have the ability to convert flink 
table to pandas dataframe.  (was: Pandas dataframe is the refactor tableau data 
format of python community. It would be nice to have the ability to convert 
flink table to pandas dataframe.)

> Support converting flink table to pandas dataframe
> --
>
> Key: FLINK-13811
> URL: https://issues.apache.org/jira/browse/FLINK-13811
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: Jeff Zhang
>Priority: Major
>
> Pandas dataframe is the de facto standard tableau data format of python 
> community. It would be nice to have the ability to convert flink table to 
> pandas dataframe.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17124) The PyFlink Job runs into infinite loop if the UDF file imports job code.

2020-04-13 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17082394#comment-17082394
 ] 

Jeff Zhang commented on FLINK-17124:


Should it be a blocker issue for 1.10.1 ?

> The PyFlink Job runs into infinite loop if the UDF file imports job code.
> -
>
> Key: FLINK-17124
> URL: https://issues.apache.org/jira/browse/FLINK-17124
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Huang Xingbo
>Priority: Major
>
> If the UDF file imports job code directly or indirectly, the PyFlink Job will 
> run into a infinite loop as follows:
>  - submit job
>  - execute job
>  - launch UDF worker
>  - import UDF
>  - (If the job file is depended by UDF or imported as the top level module) 
> import job code
>  - (If the job code is executed outside the "*if __name__ == '__main__':*") 
> launch gateway server and submit job to local executor
>  - execute job in local mode
>  - launch UDF worker
>  - import UDF
>  - import job code
>  ...
> This infinite loop will create new Java processes and Python processes 
> endlessly until the resources on the machine are exhausted. We should fix it 
> ASAP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17114) When the pyflink job runs in local mode and the command "python" points to Python 2.7, the startup of the Python UDF worker will fail.

2020-04-13 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17082194#comment-17082194
 ] 

Jeff Zhang commented on FLINK-17114:


I think it depends what is the default python in your machine. 

> When the pyflink job runs in local mode and the command "python" points to 
> Python 2.7, the startup of the Python UDF worker will fail.
> --
>
> Key: FLINK-17114
> URL: https://issues.apache.org/jira/browse/FLINK-17114
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Wei Zhong
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When the PyFlink job runs in local mode and the command "python" points to 
> Python 2.7, the startup of the Python UDF worker will fail because "python" 
> is the default interpreter of the Python UDF worker. For this case we need to 
> set the default value of "python.executable" to `sys.executable` i.e. the 
> python interpreter which launches the job.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16969) Unable to use case class in flink scala shell

2020-04-03 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074442#comment-17074442
 ] 

Jeff Zhang commented on FLINK-16969:


\cc [~tiwalter]

> Unable to use case class in flink scala shell
> -
>
> Key: FLINK-16969
> URL: https://issues.apache.org/jira/browse/FLINK-16969
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> {code:java}
> case class WC(word: String, count: Int)
> val wordCounts = benv.fromElements(WC("hello", 1),WC("world", 2), WC("world", 
> 8))
> wordCounts.collect(){code}
> Get the following exception
> {code}
> java.lang.IllegalArgumentException: requirement failed:
> The class WC is an instance class, meaning it is not a member of a
> toplevel object, or of an object contained in a toplevel object,
> therefore it requires an outer instance to be instantiated, but we don't have 
> a
> reference to the outer instance. Please consider changing the outer class to 
> an object.
>   at scala.Predef$.require(Predef.scala:224)
>   at 
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>   at 
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.(ScalaCaseClassSerializer.scala:46)
>   ... 66 elided
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16969) Unable to use case class in flink scala shell

2020-04-03 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-16969:
---
Description: 
{code:java}
case class WC(word: String, count: Int)
val wordCounts = benv.fromElements(WC("hello", 1),WC("world", 2), WC("world", 
8))
wordCounts.collect(){code}

Get the following exception
{code}
java.lang.IllegalArgumentException: requirement failed:
The class WC is an instance class, meaning it is not a member of a
toplevel object, or of an object contained in a toplevel object,
therefore it requires an outer instance to be instantiated, but we don't have a
reference to the outer instance. Please consider changing the outer class to an 
object.

  at scala.Predef$.require(Predef.scala:224)
  at 
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
  at 
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.(ScalaCaseClassSerializer.scala:46)
  ... 66 elided

{code}

  was:
{code:java}
case class WC(word: String, count: Int)val wordCounts = 
benv.fromElements(WC("hello", 1),WC("world", 2), WC("world", 8))
wordCounts.collect()

java.lang.IllegalArgumentException: requirement failed:The class WC is an 
instance class, meaning it is not a member of atoplevel object, or of an object 
contained in a toplevel object,therefore it requires an outer instance to be 
instantiated, but we don't have areference to the outer instance. Please 
consider changing the outer class to an object.
  at scala.Predef$.require(Predef.scala:224)  at 
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
  at 
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.(ScalaCaseClassSerializer.scala:46)
  ... 66 elided{code}


> Unable to use case class in flink scala shell
> -
>
> Key: FLINK-16969
> URL: https://issues.apache.org/jira/browse/FLINK-16969
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> {code:java}
> case class WC(word: String, count: Int)
> val wordCounts = benv.fromElements(WC("hello", 1),WC("world", 2), WC("world", 
> 8))
> wordCounts.collect(){code}
> Get the following exception
> {code}
> java.lang.IllegalArgumentException: requirement failed:
> The class WC is an instance class, meaning it is not a member of a
> toplevel object, or of an object contained in a toplevel object,
> therefore it requires an outer instance to be instantiated, but we don't have 
> a
> reference to the outer instance. Please consider changing the outer class to 
> an object.
>   at scala.Predef$.require(Predef.scala:224)
>   at 
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>   at 
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.(ScalaCaseClassSerializer.scala:46)
>   ... 66 elided
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16969) Unable to use case class in flink scala shell

2020-04-03 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-16969:
--

 Summary: Unable to use case class in flink scala shell
 Key: FLINK-16969
 URL: https://issues.apache.org/jira/browse/FLINK-16969
 Project: Flink
  Issue Type: Bug
  Components: Scala Shell
Affects Versions: 1.10.0
Reporter: Jeff Zhang


{code:java}
case class WC(word: String, count: Int)val wordCounts = 
benv.fromElements(WC("hello", 1),WC("world", 2), WC("world", 8))
wordCounts.collect()

java.lang.IllegalArgumentException: requirement failed:The class WC is an 
instance class, meaning it is not a member of atoplevel object, or of an object 
contained in a toplevel object,therefore it requires an outer instance to be 
instantiated, but we don't have areference to the outer instance. Please 
consider changing the outer class to an object.
  at scala.Predef$.require(Predef.scala:224)  at 
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
  at 
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.(ScalaCaseClassSerializer.scala:46)
  ... 66 elided{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16936) TablEnv creation and planner execution must be in the same thread

2020-04-02 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073417#comment-17073417
 ] 

Jeff Zhang edited comment on FLINK-16936 at 4/2/20, 6:44 AM:
-

[~godfreyhe] [~lzljs3620320] [~danny0405] [~jincheng][~tiwalter]


was (Author: zjffdu):
[~godfreyhe] [~lzljs3620320] [~danny0405] [~tiwalter]

> TablEnv creation and planner execution must be in the same thread 
> --
>
> Key: FLINK-16936
> URL: https://issues.apache.org/jira/browse/FLINK-16936
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> I hit this issue in zeppelin. Let me first describe the thread mode of 
> zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where 
> tableenv created, python thread is the python process thread, 
> python-javagateway-thread is the thread handling request from python 
> thread(same as pyflink).
> Now if I use following table api, I will get the following exception. 
> {code:java}
> st_env.from_path("cdn_access_log")\
>.select("uuid, "
>"ip_to_province(client_ip) as province, " 
>"response_size, request_time")\
>.group_by("province")\
>.select( 
>"province, count(uuid) as access_count, " 
>"sum(response_size) as total_download,  " 
>"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
>.insert_into("cdn_access_statistic") {code}
> Errors I get
> {code:java}
> Py4JJavaError: An error occurred while calling o60.insertInto.
> : java.lang.RuntimeException: Error while applying rule 
> FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args 
> [rel#107:LogicalAggregate.NONE.any.None: 
> 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
>   at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>   at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
>   at 
> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 

[jira] [Comment Edited] (FLINK-16936) TablEnv creation and planner execution must be in the same thread

2020-04-02 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073417#comment-17073417
 ] 

Jeff Zhang edited comment on FLINK-16936 at 4/2/20, 6:44 AM:
-

[~godfreyhe] [~lzljs3620320] [~danny0405] [~jincheng] [~tiwalter]


was (Author: zjffdu):
[~godfreyhe] [~lzljs3620320] [~danny0405] [~jincheng][~tiwalter]

> TablEnv creation and planner execution must be in the same thread 
> --
>
> Key: FLINK-16936
> URL: https://issues.apache.org/jira/browse/FLINK-16936
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> I hit this issue in zeppelin. Let me first describe the thread mode of 
> zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where 
> tableenv created, python thread is the python process thread, 
> python-javagateway-thread is the thread handling request from python 
> thread(same as pyflink).
> Now if I use following table api, I will get the following exception. 
> {code:java}
> st_env.from_path("cdn_access_log")\
>.select("uuid, "
>"ip_to_province(client_ip) as province, " 
>"response_size, request_time")\
>.group_by("province")\
>.select( 
>"province, count(uuid) as access_count, " 
>"sum(response_size) as total_download,  " 
>"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
>.insert_into("cdn_access_statistic") {code}
> Errors I get
> {code:java}
> Py4JJavaError: An error occurred while calling o60.insertInto.
> : java.lang.RuntimeException: Error while applying rule 
> FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args 
> [rel#107:LogicalAggregate.NONE.any.None: 
> 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
>   at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>   at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
>   at 
> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 

[jira] [Commented] (FLINK-16936) TablEnv creation and planner execution must be in the same thread

2020-04-02 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073417#comment-17073417
 ] 

Jeff Zhang commented on FLINK-16936:


[~godfreyhe] [~lzljs3620320] [~danny0405] [~tiwalter]

> TablEnv creation and planner execution must be in the same thread 
> --
>
> Key: FLINK-16936
> URL: https://issues.apache.org/jira/browse/FLINK-16936
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> I hit this issue in zeppelin. Let me first describe the thread mode of 
> zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where 
> tableenv created, python thread is the python process thread, 
> python-javagateway-thread is the thread handling request from python 
> thread(same as pyflink).
> Now if I use following table api, I will get the following exception. 
> {code:java}
> st_env.from_path("cdn_access_log")\
>.select("uuid, "
>"ip_to_province(client_ip) as province, " 
>"response_size, request_time")\
>.group_by("province")\
>.select( 
>"province, count(uuid) as access_count, " 
>"sum(response_size) as total_download,  " 
>"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
>.insert_into("cdn_access_statistic") {code}
> Errors I get
> {code:java}
> Py4JJavaError: An error occurred while calling o60.insertInto.
> : java.lang.RuntimeException: Error while applying rule 
> FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args 
> [rel#107:LogicalAggregate.NONE.any.None: 
> 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
>   at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>   at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
>   at 
> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at py4j.Gateway.invoke(Gateway.java:282)
>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at 

[jira] [Updated] (FLINK-16936) TablEnv creation and planner execution must be in the same thread

2020-04-02 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-16936:
---
Description: 
I hit this issue in zeppelin. Let me first describe the thread mode of 
zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where 
tableenv created, python thread is the python process thread, 
python-javagateway-thread is the thread handling request from python 
thread(same as pyflink).

Now if I use following table api, I will get the following exception. 
{code:java}
st_env.from_path("cdn_access_log")\
   .select("uuid, "
   "ip_to_province(client_ip) as province, " 
   "response_size, request_time")\
   .group_by("province")\
   .select( 
   "province, count(uuid) as access_count, " 
   "sum(response_size) as total_download,  " 
   "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
   .insert_into("cdn_access_statistic") {code}
Errors I get
{code:java}
Py4JJavaError: An error occurred while calling o60.insertInto.
: java.lang.RuntimeException: Error while applying rule 
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args 
[rel#107:LogicalAggregate.NONE.any.None: 
0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
  at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
  at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
  at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
  at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
  at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
  at 
org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  at py4j.Gateway.invoke(Gateway.java:282)
  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  at py4j.commands.CallCommand.execute(CallCommand.java:79)
  at py4j.GatewayConnection.run(GatewayConnection.java:238)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error occurred while applying rule 
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:143)
  at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:236)
  at 
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:146)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)
  ... 34 more
Caused by: java.lang.NullPointerException
  

[jira] [Created] (FLINK-16936) TablEnv creation and planner execution must be in the same thread

2020-04-02 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-16936:
--

 Summary: TablEnv creation and planner execution must be in the 
same thread 
 Key: FLINK-16936
 URL: https://issues.apache.org/jira/browse/FLINK-16936
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Jeff Zhang


I hit this issue in zeppelin. Let me first describe the thread mode of 
zeppelin. Here's one simple diagram which consists of 3 thread. 
scalashell-thread is thread where tableenv created, python thread is the python 
process thread, python-javagateway-thread is the thread handling request from 
python thread(same as pyflink).

Now if I use following table api, I will get the following exception. 
{code:java}
st_env.from_path("cdn_access_log")\
   .select("uuid, "
   "ip_to_province(client_ip) as province, " 
   "response_size, request_time")\
   .group_by("province")\
   .select( 
   "province, count(uuid) as access_count, " 
   "sum(response_size) as total_download,  " 
   "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
   .insert_into("cdn_access_statistic") {code}
Errors I get
{code:java}
Py4JJavaError: An error occurred while calling o60.insertInto.
: java.lang.RuntimeException: Error while applying rule 
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args 
[rel#107:LogicalAggregate.NONE.any.None: 
0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
  at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
  at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
  at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
  at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
  at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
  at 
org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  at py4j.Gateway.invoke(Gateway.java:282)
  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  at py4j.commands.CallCommand.execute(CallCommand.java:79)
  at py4j.GatewayConnection.run(GatewayConnection.java:238)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error occurred while applying rule 
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:143)
  at 

[jira] [Updated] (FLINK-16936) TablEnv creation and planner execution must be in the same thread

2020-04-02 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-16936:
---
Description: 
I hit this issue in zeppelin. Let me first describe the thread mode of 
zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where 
tableenv created, python thread is the python process thread, 
python-javagateway-thread is the thread handling request from python 
thread(same as pyflink).

Now if I use following table api, I will get the following exception. 
{code:java}
st_env.from_path("cdn_access_log")\
   .select("uuid, "
   "ip_to_province(client_ip) as province, " 
   "response_size, request_time")\
   .group_by("province")\
   .select( 
   "province, count(uuid) as access_count, " 
   "sum(response_size) as total_download,  " 
   "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
   .insert_into("cdn_access_statistic") {code}
Errors I get
{code:java}
Py4JJavaError: An error occurred while calling o60.insertInto.
: java.lang.RuntimeException: Error while applying rule 
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args 
[rel#107:LogicalAggregate.NONE.any.None: 
0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
  at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
  at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
  at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
  at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
  at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
  at 
org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  at py4j.Gateway.invoke(Gateway.java:282)
  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  at py4j.commands.CallCommand.execute(CallCommand.java:79)
  at py4j.GatewayConnection.run(GatewayConnection.java:238)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error occurred while applying rule 
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:143)
  at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:236)
  at 
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:146)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)
  ... 34 more
Caused by: java.lang.NullPointerException
  

[jira] [Comment Edited] (FLINK-16797) Flink doesn't merge multiple sinks into one DAG

2020-03-26 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17067678#comment-17067678
 ] 

Jeff Zhang edited comment on FLINK-16797 at 3/26/20, 1:29 PM:
--

Thanks [~godfreyhe]  [~jark] [~twalthr]


was (Author: zjffdu):
Thanks [~godfreyhe] & [~jark] 

> Flink doesn't merge multiple sinks into one DAG
> ---
>
> Key: FLINK-16797
> URL: https://issues.apache.org/jira/browse/FLINK-16797
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> Here's sql I used.
> {code:java}
> insert into sink_kafka select status, direction, cast(event_ts/10 as 
> timestamp(3)) from source_kafka where status <> 'foo';
> insert into sink_kafka2 select status, direction, cast(event_ts/10 as 
> timestamp(3)) from source_kafka where status <> 'foo';
>  {code}
> Ideally flink should run these 2 sql as one dag with 2 sinks, but what I see 
> is that flink won't merge them into one dag.
> !https://cdn-images-1.medium.com/max/1760/1*mFu6OZivrfGUgu1UVCcy6A.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16797) Flink doesn't merge multiple sinks into one DAG

2020-03-26 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17067678#comment-17067678
 ] 

Jeff Zhang commented on FLINK-16797:


Thanks [~godfreyhe] & [~jark] 

> Flink doesn't merge multiple sinks into one DAG
> ---
>
> Key: FLINK-16797
> URL: https://issues.apache.org/jira/browse/FLINK-16797
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> Here's sql I used.
> {code:java}
> insert into sink_kafka select status, direction, cast(event_ts/10 as 
> timestamp(3)) from source_kafka where status <> 'foo';
> insert into sink_kafka2 select status, direction, cast(event_ts/10 as 
> timestamp(3)) from source_kafka where status <> 'foo';
>  {code}
> Ideally flink should run these 2 sql as one dag with 2 sinks, but what I see 
> is that flink won't merge them into one dag.
> !https://cdn-images-1.medium.com/max/1760/1*mFu6OZivrfGUgu1UVCcy6A.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16797) Flink doesn't merge multiple sinks into one DAG

2020-03-26 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17067524#comment-17067524
 ] 

Jeff Zhang commented on FLINK-16797:


[~twalthr] I just checked FLIP-84, but it looks like about api refactoring. The 
issue of this ticket is more about planner & optimizer I believe.

> Flink doesn't merge multiple sinks into one DAG
> ---
>
> Key: FLINK-16797
> URL: https://issues.apache.org/jira/browse/FLINK-16797
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> Here's sql I used.
> {code:java}
> insert into sink_kafka select status, direction, cast(event_ts/10 as 
> timestamp(3)) from source_kafka where status <> 'foo';
> insert into sink_kafka2 select status, direction, cast(event_ts/10 as 
> timestamp(3)) from source_kafka where status <> 'foo';
>  {code}
> Ideally flink should run these 2 sql as one dag with 2 sinks, but what I see 
> is that flink won't merge them into one dag.
> !https://cdn-images-1.medium.com/max/1760/1*mFu6OZivrfGUgu1UVCcy6A.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16797) Flink doesn't merge multiple sinks into one DAG

2020-03-26 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17067523#comment-17067523
 ] 

Jeff Zhang commented on FLINK-16797:


Thanks for the info [~twalthr]

> Flink doesn't merge multiple sinks into one DAG
> ---
>
> Key: FLINK-16797
> URL: https://issues.apache.org/jira/browse/FLINK-16797
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> Here's sql I used.
> {code:java}
> insert into sink_kafka select status, direction, cast(event_ts/10 as 
> timestamp(3)) from source_kafka where status <> 'foo';
> insert into sink_kafka2 select status, direction, cast(event_ts/10 as 
> timestamp(3)) from source_kafka where status <> 'foo';
>  {code}
> Ideally flink should run these 2 sql as one dag with 2 sinks, but what I see 
> is that flink won't merge them into one dag.
> !https://cdn-images-1.medium.com/max/1760/1*mFu6OZivrfGUgu1UVCcy6A.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16797) Flink doesn't merge multiple sinks into one DAG

2020-03-26 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-16797:
--

 Summary: Flink doesn't merge multiple sinks into one DAG
 Key: FLINK-16797
 URL: https://issues.apache.org/jira/browse/FLINK-16797
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Jeff Zhang


Here's sql I used.
{code:java}
insert into sink_kafka select status, direction, cast(event_ts/10 as 
timestamp(3)) from source_kafka where status <> 'foo';
insert into sink_kafka2 select status, direction, cast(event_ts/10 as 
timestamp(3)) from source_kafka where status <> 'foo';
 {code}
Ideally flink should run these 2 sql as one dag with 2 sinks, but what I see is 
that flink won't merge them into one dag.

!https://cdn-images-1.medium.com/max/1760/1*mFu6OZivrfGUgu1UVCcy6A.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16604) Column key in JM configuration is too narrow

2020-03-15 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-16604:
---
Priority: Minor  (was: Major)

> Column key in JM configuration is too narrow
> 
>
> Key: FLINK-16604
> URL: https://issues.apache.org/jira/browse/FLINK-16604
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Minor
> Attachments: image-2020-03-15-22-22-01-696.png
>
>
> See the screenshot
>  
> !image-2020-03-15-22-22-01-696.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16604) Column key in JM configuration is too narrow

2020-03-15 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059711#comment-17059711
 ] 

Jeff Zhang commented on FLINK-16604:


\cc [~vthinkxie]

> Column key in JM configuration is too narrow
> 
>
> Key: FLINK-16604
> URL: https://issues.apache.org/jira/browse/FLINK-16604
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
> Attachments: image-2020-03-15-22-22-01-696.png
>
>
> See the screenshot
>  
> !image-2020-03-15-22-22-01-696.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16604) Column key in JM configuration is too narrow

2020-03-15 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-16604:
---
Attachment: (was: image-2020-03-15-22-21-17-369.png)

> Column key in JM configuration is too narrow
> 
>
> Key: FLINK-16604
> URL: https://issues.apache.org/jira/browse/FLINK-16604
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
> Attachments: image-2020-03-15-22-22-01-696.png
>
>
> See the screenshot
>  
> !image-2020-03-15-22-22-01-696.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16604) Column key in JM configuration is too narrow

2020-03-15 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-16604:
---
Description: 
See the screenshot

 

!image-2020-03-15-22-22-01-696.png!

  was:
See the screenshot

 

!image-2020-03-15-22-21-17-369.png!


> Column key in JM configuration is too narrow
> 
>
> Key: FLINK-16604
> URL: https://issues.apache.org/jira/browse/FLINK-16604
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
> Attachments: image-2020-03-15-22-21-17-369.png, 
> image-2020-03-15-22-22-01-696.png
>
>
> See the screenshot
>  
> !image-2020-03-15-22-22-01-696.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16604) Column key in JM configuration is too narrow

2020-03-15 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-16604:
---
Attachment: image-2020-03-15-22-22-01-696.png

> Column key in JM configuration is too narrow
> 
>
> Key: FLINK-16604
> URL: https://issues.apache.org/jira/browse/FLINK-16604
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
> Attachments: image-2020-03-15-22-21-17-369.png, 
> image-2020-03-15-22-22-01-696.png
>
>
> See the screenshot
>  
> !image-2020-03-15-22-21-17-369.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16604) Column key in JM configuration is too narrow

2020-03-15 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-16604:
--

 Summary: Column key in JM configuration is too narrow
 Key: FLINK-16604
 URL: https://issues.apache.org/jira/browse/FLINK-16604
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.10.0
Reporter: Jeff Zhang
 Attachments: image-2020-03-15-22-21-17-369.png

See the screenshot

 

!image-2020-03-15-22-21-17-369.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16418) Hide hive version to avoid user confuse

2020-03-04 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17051021#comment-17051021
 ] 

Jeff Zhang commented on FLINK-16418:


[~lzljs3620320] Do you mean to detect hive version for users ? 

> Hide hive version to avoid user confuse
> ---
>
> Key: FLINK-16418
> URL: https://issues.apache.org/jira/browse/FLINK-16418
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
> Fix For: 1.11.0
>
>
> Version in Yaml/HiveCatalog needs to be consistent with the dependencies 
> version. There are three places: version in metastore, version in 
> dependencies, version in Yaml/HiveCatalog, users are easy to make mistakes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14991) Export `FLINK_HOME` environment variable to all the entrypoint

2020-02-28 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16985271#comment-16985271
 ] 

Jeff Zhang edited comment on FLINK-14991 at 2/28/20 1:43 PM:
-

+1, but it would be better to reserve `FLINK_CONF_DIR`. That means we could 
derive `FLINK_LIB_DIR`,`FLINK_OPT_DIR`,`FLINK_PLUGIN_DIR`,`FLINK_BIN_DIR` from 
`FLINK_HOME`, but try to look for env `FLINK_CONF_DIR` first, if it doesn't 
exist, then use `FLINK_HOME/conf`.
 The reason is that some vendor flink distribution will define `FLINK_CONF_DIR` 
separately which doesn't need to be under `FLINK_HOME`, e.g. /etc/flink/conf


was (Author: zjffdu):
+1, but it would be better to reserve `FLINK_CONF_DIR`. That means we could 
derive `FLINK_LIB_DIR`,`FLINK_OPT_DIR`,`FLINK_PLUGIN_DIR`,`FLINK_BIN_DIR` from 
`FLINK_HOME`, but try to look for env `FLINK_CONF_DIR` first, if it doesn't 
exist, then use `FLINK_HOME/conf`.
The reason is that some vendor flink distribution will define `FLINK_HOME/conf` 
separately. 

> Export `FLINK_HOME` environment variable to all the entrypoint
> --
>
> Key: FLINK-14991
> URL: https://issues.apache.org/jira/browse/FLINK-14991
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client, Deployment / Scripts
>Reporter: Guowei Ma
>Priority: Minor
>
>  Currently, Flink depends on 6 types of files: configuration files, system 
> jars files, script files、library jar files, plugin jar files, and user jars 
> files. These files are in different directories. 
> Flink exports 5 environment variables to locate these different type files: 
> `FLINK_CONF_DIR`,`FLINK_LIB_DIR`,`FLINK_OPT_DIR`,`FLINK_PLUGIN_DIR`,`FLINK_BIN_DIR`.
> It is not a good style that exports an environment variable for every type of 
> file.
> So this jira proposes to export the `FLINK_HOME` environment variable to all 
> the entrypoint. Derive the directory of the different type files from the 
> `FLINK_HOME` environment variable and every type file has a fixed directory 
> name.
>  This also has another benefit that the method implies the directory 
> structure is the same in all the situations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16286) Support select from nothing

2020-02-26 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang closed FLINK-16286.
--
Resolution: Invalid

> Support select from nothing
> ---
>
> Key: FLINK-16286
> URL: https://issues.apache.org/jira/browse/FLINK-16286
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> It would be nice to support from noting in flink sql. e.g. 
> {code:java}
> select "name", 1+1, Date() {code}
> This is especially useful when user want to try udf provided by others 
> without creating new table for faked data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16286) Support select from nothing

2020-02-26 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045244#comment-17045244
 ] 

Jeff Zhang commented on FLINK-16286:


Sorry, my fault.  I used double quote which cause the parsing error. I should 
use single quote 
{code:java}
select 'name' {code}

> Support select from nothing
> ---
>
> Key: FLINK-16286
> URL: https://issues.apache.org/jira/browse/FLINK-16286
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> It would be nice to support from noting in flink sql. e.g. 
> {code:java}
> select "name", 1+1, Date() {code}
> This is especially useful when user want to try udf provided by others 
> without creating new table for faked data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16286) Support select from nothing

2020-02-26 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045228#comment-17045228
 ] 

Jeff Zhang commented on FLINK-16286:


Is it supported in 1.10 ? I just tried flink 1.10 

> Support select from nothing
> ---
>
> Key: FLINK-16286
> URL: https://issues.apache.org/jira/browse/FLINK-16286
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> It would be nice to support from noting in flink sql. e.g. 
> {code:java}
> select "name", 1+1, Date() {code}
> This is especially useful when user want to try udf provided by others 
> without creating new table for faked data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16286) Support select from nothing

2020-02-25 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-16286:
---
Component/s: Table SQL / API

> Support select from nothing
> ---
>
> Key: FLINK-16286
> URL: https://issues.apache.org/jira/browse/FLINK-16286
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> It would be nice to support from noting in flink sql. e.g. 
> {code:java}
> select "name", 1+1, Date() {code}
> This is especially useful when user want to try udf provided by others 
> without creating new table for faked data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16286) Support select from nothing

2020-02-25 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-16286:
--

 Summary: Support select from nothing
 Key: FLINK-16286
 URL: https://issues.apache.org/jira/browse/FLINK-16286
 Project: Flink
  Issue Type: New Feature
Affects Versions: 1.10.0
Reporter: Jeff Zhang


It would be nice to support from noting in flink sql. e.g. 
{code:java}
select "name", 1+1, Date() {code}
This is especially useful when user want to try udf provided by others without 
creating new table for faked data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16266) es6 & 7 table sql connector conflicts each other

2020-02-24 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17044043#comment-17044043
 ] 

Jeff Zhang commented on FLINK-16266:


AFAIK plugin mechanism is not ready for connectors. Someone others may know 
more details. 

> es6 & 7 table sql connector conflicts each other
> 
>
> Key: FLINK-16266
> URL: https://issues.apache.org/jira/browse/FLINK-16266
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.10.0
>Reporter: Benchao Li
>Priority: Major
>
> If we put {{flink-sql-connector-elasticsearch6}} and 
> {{flink-sql-connector-elasticsearch7}} into {{/lib}} at the same time, and 
> use it in {{sql-client}}, will get exceptions like:
>  
> {code:java}
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AbstractMethodError: 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase.createElasticsearchUpsertTableSink(ZLorg/apache/flink/table/api/TableSchema;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Lorg/apache/flink/api/common/serialization/SerializationSchema;Lorg/apache/flink/elasticsearch6/shaded/org/elasticsearch/common/xcontent/XContentType;Lorg/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler;Ljava/util/Map;)Lorg/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase;
> {code}
>  
> After analyzing the exceptions, IMO, it's because 
> {{flink-connector-elasticsearch-base}} is included into both 
> {{flink-sql-connector-elasticsearch6}} and 
> {{flink-sql-connector-elasticsearch7. And 
> }}{{flink-connector-elasticsearch-base}}{{ has different implementation in 6 
> & 7, because the version of elastic-search is different.}}
>  
> {{A simple way for fixing this is we can relocate 
> }}{{flink-connector-elasticsearch-base in 
> }}{{flink-sql-connector-elasticsearch6}}{{ and 
> }}{{flink-sql-connector-elasticsearch7. For example for 
> }}{{flink-sql-connector-elasticsearch7:}}
> {code:java}
> 
>org.apache.flink.streaming.connectors.elasticsearch.
>
> org.apache.flink.streaming.connectors.elasticsearch7.base.
> 
> {code}
> {{cc [~jark] [~twalthr] }}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16266) es6 & 7 table sql connector conflicts each other

2020-02-24 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17044016#comment-17044016
 ] 

Jeff Zhang commented on FLINK-16266:


BTW, IMHO I don't think it is best practice to put connector jars under lib. 
Because most of time FLINK_HOME is shared by multiples users/applications. 
Putting third party jars under lib would affect other flink applications

> es6 & 7 table sql connector conflicts each other
> 
>
> Key: FLINK-16266
> URL: https://issues.apache.org/jira/browse/FLINK-16266
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.10.0
>Reporter: Benchao Li
>Priority: Major
>
> If we put {{flink-sql-connector-elasticsearch6}} and 
> {{flink-sql-connector-elasticsearch7}} into {{/lib}} at the same time, and 
> use it in {{sql-client}}, will get exceptions like:
>  
> {code:java}
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AbstractMethodError: 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase.createElasticsearchUpsertTableSink(ZLorg/apache/flink/table/api/TableSchema;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Lorg/apache/flink/api/common/serialization/SerializationSchema;Lorg/apache/flink/elasticsearch6/shaded/org/elasticsearch/common/xcontent/XContentType;Lorg/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler;Ljava/util/Map;)Lorg/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase;
> {code}
>  
> After analyzing the exceptions, IMO, it's because 
> {{flink-connector-elasticsearch-base}} is included into both 
> {{flink-sql-connector-elasticsearch6}} and 
> {{flink-sql-connector-elasticsearch7. And 
> }}{{flink-connector-elasticsearch-base}}{{ has different implementation in 6 
> & 7, because the version of elastic-search is different.}}
>  
> {{A simple way for fixing this is we can relocate 
> }}{{flink-connector-elasticsearch-base in 
> }}{{flink-sql-connector-elasticsearch6}}{{ and 
> }}{{flink-sql-connector-elasticsearch7. For example for 
> }}{{flink-sql-connector-elasticsearch7:}}
> {code:java}
> 
>org.apache.flink.streaming.connectors.elasticsearch.
>
> org.apache.flink.streaming.connectors.elasticsearch7.base.
> 
> {code}
> {{cc [~jark] [~twalthr] }}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16266) es6 & 7 table sql connector conflicts each other

2020-02-24 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17044013#comment-17044013
 ] 

Jeff Zhang commented on FLINK-16266:


I believe this could be resolved via plugin mechanism. 

> es6 & 7 table sql connector conflicts each other
> 
>
> Key: FLINK-16266
> URL: https://issues.apache.org/jira/browse/FLINK-16266
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.10.0
>Reporter: Benchao Li
>Priority: Major
>
> If we put {{flink-sql-connector-elasticsearch6}} and 
> {{flink-sql-connector-elasticsearch7}} into {{/lib}} at the same time, and 
> use it in {{sql-client}}, will get exceptions like:
>  
> {code:java}
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AbstractMethodError: 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase.createElasticsearchUpsertTableSink(ZLorg/apache/flink/table/api/TableSchema;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Lorg/apache/flink/api/common/serialization/SerializationSchema;Lorg/apache/flink/elasticsearch6/shaded/org/elasticsearch/common/xcontent/XContentType;Lorg/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler;Ljava/util/Map;)Lorg/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase;
> {code}
>  
> After analyzing the exceptions, IMO, it's because 
> {{flink-connector-elasticsearch-base}} is included into both 
> {{flink-sql-connector-elasticsearch6}} and 
> {{flink-sql-connector-elasticsearch7. And 
> }}{{flink-connector-elasticsearch-base}}{{ has different implementation in 6 
> & 7, because the version of elastic-search is different.}}
>  
> {{A simple way for fixing this is we can relocate 
> }}{{flink-connector-elasticsearch-base in 
> }}{{flink-sql-connector-elasticsearch6}}{{ and 
> }}{{flink-sql-connector-elasticsearch7. For example for 
> }}{{flink-sql-connector-elasticsearch7:}}
> {code:java}
> 
>org.apache.flink.streaming.connectors.elasticsearch.
>
> org.apache.flink.streaming.connectors.elasticsearch7.base.
> 
> {code}
> {{cc [~jark] [~twalthr] }}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15935) Unable to use watermark when depends both on flink planner and blink planner

2020-02-05 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15935:
---
Component/s: Table SQL / Planner

> Unable to use watermark when depends both on flink planner and blink planner
> 
>
> Key: FLINK-15935
> URL: https://issues.apache.org/jira/browse/FLINK-15935
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Blocker
>
> Run the following code in module `flink-examples-table` (must be under this 
> module)
> {code:java}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.flink.table.examples.java;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> /**
>  * javadoc.
>  */
> public class TableApiExample {
>/**
> *
> * @param args
> * @throws Exception
> */
>public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>   EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>   StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv, bsSettings);
>   bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" +
>  "status  STRING,\n" +
>  "direction STRING,\n" +
>  "event_ts TIMESTAMP(3),\n" +
>  "WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" +
>  ") WITH (\n" +
>  "  'connector.type' = 'kafka',   \n" +
>  "  'connector.version' = 'universal',\n" +
>  "  'connector.topic' = 'generated.events2',\n" +
>  "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
>  "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
>  "  'connector.properties.group.id' = 'testGroup',\n" +
>  "  'format.type'='json',\n" +
>  "  'update-mode' = 'append'\n" +
>  ")\n");
>}
> }
>  {code}
>  
> And hit the following error:
> {code:java}
> Exception in thread "main" 
> org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to 
> line 5, column 38: Unknown identifier 'event_ts'Exception in thread "main" 
> org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to 
> line 5, column 38: Unknown identifier 'event_ts' at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) 
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
>  at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
>  at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at 
> 

[jira] [Updated] (FLINK-15935) Unable to use watermark when depends both on flink planner and blink planner

2020-02-05 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15935:
---
Description: 
Run the following code in module `flink-examples-table` (must be under this 
module)
{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.examples.java;


import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
 * javadoc.
 */
public class TableApiExample {

   /**
*
* @param args
* @throws Exception
*/
   public static void main(String[] args) throws Exception {

  StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
  bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
  bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" +
 "status  STRING,\n" +
 "direction STRING,\n" +
 "event_ts TIMESTAMP(3),\n" +
 "WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" +
 ") WITH (\n" +
 "  'connector.type' = 'kafka',   \n" +
 "  'connector.version' = 'universal',\n" +
 "  'connector.topic' = 'generated.events2',\n" +
 "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
 "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
 "  'connector.properties.group.id' = 'testGroup',\n" +
 "  'format.type'='json',\n" +
 "  'update-mode' = 'append'\n" +
 ")\n");

   }
}
 {code}
 

And hit the following error:
{code:java}
Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: 
From line 5, column 31 to line 5, column 38: Unknown identifier 
'event_ts'Exception in thread "main" 
org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to 
line 5, column 38: Unknown identifier 'event_ts' at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
 at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
 at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at 
org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
 at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
 at 

[jira] [Commented] (FLINK-15935) Unable to use watermark when depends both on flink planner and blink planner

2020-02-05 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17031320#comment-17031320
 ] 

Jeff Zhang commented on FLINK-15935:


\cc [~jark] [~lzljs3620320] [~ykt836] [~liyu] [~gjy] 

> Unable to use watermark when depends both on flink planner and blink planner
> 
>
> Key: FLINK-15935
> URL: https://issues.apache.org/jira/browse/FLINK-15935
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Blocker
>
> Run the following code in module `flink-table-examples` (must be under this 
> module)
> {code:java}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.flink.table.examples.java;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> /**
>  * javadoc.
>  */
> public class TableApiExample {
>/**
> *
> * @param args
> * @throws Exception
> */
>public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>   EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>   StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv, bsSettings);
>   bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" +
>  "status  STRING,\n" +
>  "direction STRING,\n" +
>  "event_ts TIMESTAMP(3),\n" +
>  "WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" +
>  ") WITH (\n" +
>  "  'connector.type' = 'kafka',   \n" +
>  "  'connector.version' = 'universal',\n" +
>  "  'connector.topic' = 'generated.events2',\n" +
>  "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
>  "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
>  "  'connector.properties.group.id' = 'testGroup',\n" +
>  "  'format.type'='json',\n" +
>  "  'update-mode' = 'append'\n" +
>  ")\n");
>}
> }
>  {code}
>  
> And hit the following error:
> {code:java}
> Exception in thread "main" 
> org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to 
> line 5, column 38: Unknown identifier 'event_ts'Exception in thread "main" 
> org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to 
> line 5, column 38: Unknown identifier 'event_ts' at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) 
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
>  at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
>  at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at 
> 

[jira] [Created] (FLINK-15935) Unable to use watermark when depends both on flink planner and blink planner

2020-02-05 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15935:
--

 Summary: Unable to use watermark when depends both on flink 
planner and blink planner
 Key: FLINK-15935
 URL: https://issues.apache.org/jira/browse/FLINK-15935
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.10.0
Reporter: Jeff Zhang


Run the following code in module `flink-table-examples` (must be under this 
module)
{code:java}

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.examples.java;


import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
 * javadoc.
 */
public class TableApiExample {

   /**
*
* @param args
* @throws Exception
*/
   public static void main(String[] args) throws Exception {

  StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
  bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
  bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" +
 "status  STRING,\n" +
 "direction STRING,\n" +
 "event_ts TIMESTAMP(3),\n" +
 "WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" +
 ") WITH (\n" +
 "  'connector.type' = 'kafka',   \n" +
 "  'connector.version' = 'universal',\n" +
 "  'connector.topic' = 'generated.events2',\n" +
 "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
 "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
 "  'connector.properties.group.id' = 'testGroup',\n" +
 "  'format.type'='json',\n" +
 "  'update-mode' = 'append'\n" +
 ")\n");

   }
}
 {code}
 

And hit the following error:
{code:java}

Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: 
From line 5, column 31 to line 5, column 38: Unknown identifier 
'event_ts'Exception in thread "main" 
org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to 
line 5, column 38: Unknown identifier 'event_ts' at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
 at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
 at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at 
org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
 at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
 at 

[jira] [Commented] (FLINK-15858) Unable to use HiveCatalog and kafka together

2020-02-04 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17030299#comment-17030299
 ] 

Jeff Zhang commented on FLINK-15858:


[~liyu] [~gjy] This ticket means users unable to use kafka with hivecatalog 
together when involving timestamp field. I believe this should be a blocker 
issue for 1.10 

> Unable to use HiveCatalog and kafka together
> 
>
> Key: FLINK-15858
> URL: https://issues.apache.org/jira/browse/FLINK-15858
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
>  
> HiveCatalog only support timestamp(9), but kafka only support timestamp(3). 
> This make user unable to use HiveCatalog and kafka together
> {code:java}
> Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: 
> HiveCatalog currently only supports timestamp of precision 9
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173)
>   at 
> org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106)
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15858) Unable to use HiveCatalog and kafka together

2020-02-04 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15858:
---
Priority: Blocker  (was: Critical)

> Unable to use HiveCatalog and kafka together
> 
>
> Key: FLINK-15858
> URL: https://issues.apache.org/jira/browse/FLINK-15858
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
>  
> HiveCatalog only support timestamp(9), but kafka only support timestamp(3). 
> This make user unable to use HiveCatalog and kafka together
> {code:java}
> Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: 
> HiveCatalog currently only supports timestamp of precision 9
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173)
>   at 
> org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106)
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15858) Unable to use HiveCatalog and kafka together

2020-02-04 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17030295#comment-17030295
 ] 

Jeff Zhang commented on FLINK-15858:


[~lzljs3620320] But IIRC, this issue means users unable to use kafka with 
hivecatalog together. Doesn't that mean a blocker issue ?

> Unable to use HiveCatalog and kafka together
> 
>
> Key: FLINK-15858
> URL: https://issues.apache.org/jira/browse/FLINK-15858
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
>  
> HiveCatalog only support timestamp(9), but kafka only support timestamp(3). 
> This make user unable to use HiveCatalog and kafka together
> {code:java}
> Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: 
> HiveCatalog currently only supports timestamp of precision 9
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173)
>   at 
> org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106)
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15858) Unable to use HiveCatalog and kafka together

2020-02-03 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029495#comment-17029495
 ] 

Jeff Zhang commented on FLINK-15858:


In that case, it is a regression issue, definitely need to fix in 1.10

> Unable to use HiveCatalog and kafka together
> 
>
> Key: FLINK-15858
> URL: https://issues.apache.org/jira/browse/FLINK-15858
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Blocker
> Fix For: 1.10.0
>
>
>  
> HiveCatalog only support timestamp(9), but kafka only support timestamp(3). 
> This make user unable to use HiveCatalog and kafka together
> {code:java}
> Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: 
> HiveCatalog currently only supports timestamp of precision 9
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173)
>   at 
> org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106)
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15858) Unable to use HiveCatalog and kafka together

2020-02-03 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15858:
---
Priority: Blocker  (was: Critical)

> Unable to use HiveCatalog and kafka together
> 
>
> Key: FLINK-15858
> URL: https://issues.apache.org/jira/browse/FLINK-15858
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Blocker
> Fix For: 1.10.0
>
>
>  
> HiveCatalog only support timestamp(9), but kafka only support timestamp(3). 
> This make user unable to use HiveCatalog and kafka together
> {code:java}
> Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: 
> HiveCatalog currently only supports timestamp of precision 9
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173)
>   at 
> org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106)
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15858) Unable to use HiveCatalog and kafka together

2020-02-02 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17028713#comment-17028713
 ] 

Jeff Zhang commented on FLINK-15858:


\cc [~lirui] [~lzljs3620320]

> Unable to use HiveCatalog and kafka together
> 
>
> Key: FLINK-15858
> URL: https://issues.apache.org/jira/browse/FLINK-15858
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
>  
> HiveCatalog only support timestamp(9), but kafka only support timestamp(3). 
> This make user unable to use HiveCatalog and kafka together
> {code:java}
> Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: 
> HiveCatalog currently only supports timestamp of precision 9
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173)
>   at 
> org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84)
>   at 
> org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106)
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15858) Unable to use HiveCatalog and kafka together

2020-02-02 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15858:
--

 Summary: Unable to use HiveCatalog and kafka together
 Key: FLINK-15858
 URL: https://issues.apache.org/jira/browse/FLINK-15858
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.10.0
Reporter: Jeff Zhang


 

HiveCatalog only support timestamp(9), but kafka only support timestamp(3). 
This make user unable to use HiveCatalog and kafka together
{code:java}

Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: 
HiveCatalog currently only supports timestamp of precision 9
at 
org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:272)
at 
org.apache.flink.table.catalog.hive.util.HiveTypeUtil$TypeInfoLogicalTypeVisitor.visit(HiveTypeUtil.java:173)
at 
org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151)
at 
org.apache.flink.table.catalog.hive.util.HiveTypeUtil.toHiveTypeInfo(HiveTypeUtil.java:84)
at 
org.apache.flink.table.catalog.hive.util.HiveTableUtil.createHiveColumns(HiveTableUtil.java:106)
 {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15644) Add support for SQL query validation

2020-01-18 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018592#comment-17018592
 ] 

Jeff Zhang commented on FLINK-15644:


[~fhueske] Will DDL/DML also be verified ?

> Add support for SQL query validation 
> -
>
> Key: FLINK-15644
> URL: https://issues.apache.org/jira/browse/FLINK-15644
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Fabian Hueske
>Priority: Major
>
> It would be good if the {{TableEnvironment}} would offer methods to check the 
> validity of SQL queries. Such a method could be used by services (CLI query 
> shells, notebooks, SQL UIs) that are backed by Flink and execute their 
> queries on Flink.
> Validation should be available in two levels:
>  # Validation of syntax and semantics: This includes parsing the query, 
> checking the catalog for dbs, tables, fields, type checks for expressions and 
> functions, etc. This will check if the query is a valid SQL query.
>  # Validation that query is supported: Checks if Flink can execute the given 
> query. Some syntactically and semantically valid SQL queries are not 
> supported, esp. in a streaming context. This requires running the optimizer. 
> If the optimizer generates an execution plan, the query can be executed. This 
> check includes the first step and is more expensive.
> The reason for this separation is that the first check can be done much fast 
> as it does not involve calling the optimizer. Hence, it would be suitable for 
> fast checks in an interactive query editor. The second check might take more 
> time (depending on the complexity of the query) and might not be suitable for 
> rapid checks but only on explicit user request.
> Requirements:
>  * validation does not modify the state of the {{TableEnvironment}}, i.e. it 
> does not add plan operators
>  * validation does not require connector dependencies
>  * validation can identify the update mode of a continuous query result 
> (append-only, upsert, retraction).
> Out of scope for this issue:
>  * better error messages for unsupported features as suggested by FLINK-7217



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15635) Allow passing a ClassLoader to EnvironmentSettings

2020-01-17 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018069#comment-17018069
 ] 

Jeff Zhang commented on FLINK-15635:


[~twalthr] Would this affect the ClassLoader in TableFactoryService ? I also 
hit classloader issue in Zeppelin. Although I have work around, it would be 
nice to allow user to specify classloader explicitly.

 

> Allow passing a ClassLoader to EnvironmentSettings
> --
>
> Key: FLINK-15635
> URL: https://issues.apache.org/jira/browse/FLINK-15635
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> We had a couple of class loading issues in the past because people forgot to 
> use the right classloader in {{flink-table}}. The SQL Client executor code 
> hacks a classloader into the planner process by using {{wrapClassLoader}} 
> that sets the threads context classloader.
> Instead we should allow passing a class loader to environment settings. This 
> class loader can be passed to the planner and can be stored in table 
> environment, table config, etc. to have a consistent class loading behavior.
> Having this in place should replace the need for 
> {{Thread.currentThread().getContextClassLoader()}} in the entire 
> {{flink-table}} module.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15592) Streaming sql throw hive related sql when it doesn't use any hive table

2020-01-14 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15592:
--

 Summary: Streaming sql throw hive related sql when it doesn't use 
any hive table
 Key: FLINK-15592
 URL: https://issues.apache.org/jira/browse/FLINK-15592
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Jeff Zhang


I use the following streaming sql to query a kafka table whose metadata is 
store in hive metastore via HiveCatalog. But it will throw hive related 
exception which is very confusing.

SQL
{code}
SELECT *
FROM (
   SELECT *,
 ROW_NUMBER() OVER(
   ORDER BY event_ts) AS rownum
   FROM source_kafka)
WHERE rownum <= 10
{code}

Exception
{code}
Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
failed. java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:130)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at 
org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:103)
... 13 more
Caused by: java.lang.RuntimeException: 
java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
at 
org.apache.flink.table.planner.functions.utils.HiveAggSqlFunction.lambda$createReturnTypeInference$0(HiveAggSqlFunction.java:82)
at 
org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
at 
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.SqlCallBinding.getOperandType(SqlCallBinding.java:237)
at 
org.apache.calcite.sql.type.OrdinalReturnTypeInference.inferReturnType(OrdinalReturnTypeInference.java:40)
at 
org.apache.calcite.sql.type.SqlTypeTransformCascade.inferReturnType(SqlTypeTransformCascade.java:54)
at 
org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
at 
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
at 
org.apache.calcite.sql.SqlOverOperator.deriveType(SqlOverOperator.java:86)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:479)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4105)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3389)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 

[jira] [Updated] (FLINK-15592) Streaming sql throw hive exception when it doesn't use any hive table

2020-01-14 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15592:
---
Summary: Streaming sql throw hive exception when it doesn't use any hive 
table  (was: Streaming sql throw hive related sql when it doesn't use any hive 
table)

> Streaming sql throw hive exception when it doesn't use any hive table
> -
>
> Key: FLINK-15592
> URL: https://issues.apache.org/jira/browse/FLINK-15592
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> I use the following streaming sql to query a kafka table whose metadata is 
> store in hive metastore via HiveCatalog. But it will throw hive related 
> exception which is very confusing.
> SQL
> {code}
> SELECT *
> FROM (
>SELECT *,
>  ROW_NUMBER() OVER(
>ORDER BY event_ts) AS rownum
>FROM source_kafka)
> WHERE rownum <= 10
> {code}
> Exception
> {code}
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
> failed. java.lang.reflect.InvocationTargetException
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:130)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>   at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:103)
>   ... 13 more
> Caused by: java.lang.RuntimeException: 
> java.lang.reflect.InvocationTargetException
>   at 
> org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
>   at 
> org.apache.flink.table.planner.functions.utils.HiveAggSqlFunction.lambda$createReturnTypeInference$0(HiveAggSqlFunction.java:82)
>   at 
> org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
>   at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
>   at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
>   at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
>   at 
> org.apache.calcite.sql.SqlCallBinding.getOperandType(SqlCallBinding.java:237)
>   at 
> org.apache.calcite.sql.type.OrdinalReturnTypeInference.inferReturnType(OrdinalReturnTypeInference.java:40)
>   at 
> org.apache.calcite.sql.type.SqlTypeTransformCascade.inferReturnType(SqlTypeTransformCascade.java:54)
>   at 
> org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
>   at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
>   at 
> org.apache.calcite.sql.SqlOverOperator.deriveType(SqlOverOperator.java:86)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
>   at 
> org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:479)
>   at 
> 

[jira] [Commented] (FLINK-15566) Flink implicitly order the fields in PojoTypeInfo

2020-01-13 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014340#comment-17014340
 ] 

Jeff Zhang commented on FLINK-15566:


[~twalthr] When will FLIP-65 be completed ?

> Flink implicitly order the fields in PojoTypeInfo
> -
>
> Key: FLINK-15566
> URL: https://issues.apache.org/jira/browse/FLINK-15566
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
> Attachments: image-2020-01-13-16-02-57-949.png
>
>
> I don't know why flink would do that, but this cause my user defined function 
> behavior incorrectly if I and pojo in my udf and override getResultType
> [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]
>  
> Here's the udf I define.
>  
> {code:java}
> %flink
> import org.apache.flink.api.java.typeutils.RowTypeInfo
> import org.apache.flink.api.common.typeinfo.Types
> import org.apache.flink.api.java.typeutils._
> import org.apache.flink.api.scala.typeutils._
> import org.apache.flink.api.scala._
> class Person(val age:Int, val job: String, val marital: String, val 
> education: String, val default: String, val balance: String, val housing: 
> String, val loan: String, val contact: String, val day: String, val month: 
> String, val duration: Int, val campaign: Int, val pdays: Int, val previous: 
> Int, val poutcome: String, val y: String)
> class ParseFunction extends TableFunction[Person] {
>   def eval(line: String) {
> val tokens = line.split(";")
> // parse the line
> if (!line.startsWith("\"age\"")) {
>   collect(new Person(new Integer(tokens(0).toInt), normalize(tokens(1)), 
> normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), 
> normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), 
> normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new 
> Integer(tokens(11).toInt),  new Integer(tokens(12).toInt),  
>new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),  
> normalize(tokens(15)), normalize(tokens(16
> }
>   }
>   
>   override def getResultType() = {
> val cls = classOf[Person]
> new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
>new PojoField(cls.getDeclaredField("age"), Types.INT),
>new PojoField(cls.getDeclaredField("job"), Types.STRING),
>new PojoField(cls.getDeclaredField("marital"), Types.STRING),
>new PojoField(cls.getDeclaredField("education"), Types.STRING),
>new PojoField(cls.getDeclaredField("default"), Types.STRING),
>new PojoField(cls.getDeclaredField("balance"), Types.STRING), 
>new PojoField(cls.getDeclaredField("housing"), Types.STRING), 
>new PojoField(cls.getDeclaredField("loan"), Types.STRING), 
>new PojoField(cls.getDeclaredField("contact"), Types.STRING), 
>new PojoField(cls.getDeclaredField("day"), Types.STRING), 
>new PojoField(cls.getDeclaredField("month"), Types.STRING), 
>new PojoField(cls.getDeclaredField("duration"), Types.INT),
>new PojoField(cls.getDeclaredField("campaign"), Types.INT),
>new PojoField(cls.getDeclaredField("pdays"), Types.INT),
>new PojoField(cls.getDeclaredField("previous"), Types.INT),
>new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
>new PojoField(cls.getDeclaredField("y"), Types.STRING)
>  ))
>   }  
>   // remove the quote
>   private def normalize(token: String) = {
>   if (token.startsWith("\"")) {
>   token.substring(1, token.length - 1)
>   } else {
>   token
>   }
>   }
> }{code}
> And then I use this udf in sql but get the wrong result because flink reorder 
> the fields implicitly.
>  !image-2020-01-13-16-02-57-949.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15569) Incorrect sample code in udf document

2020-01-13 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15569:
--

 Summary: Incorrect sample code in udf document
 Key: FLINK-15569
 URL: https://issues.apache.org/jira/browse/FLINK-15569
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.10.0
Reporter: Jeff Zhang
 Attachments: image-2020-01-13-16-59-00-022.png

Should use JTuple2 instead of JTuple1

 !image-2020-01-13-16-59-00-022.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15566) Flink implicitly order the fields in PojoTypeInfo

2020-01-13 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15566:
---
Description: 
I don't know why flink would do that, but this cause my user defined function 
behavior incorrectly if I and pojo in my udf and override getResultType

[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]

 

Here's the udf I define.

 
{code:java}

%flink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils._
import org.apache.flink.api.scala._

class Person(val age:Int, val job: String, val marital: String, val education: 
String, val default: String, val balance: String, val housing: String, val 
loan: String, val contact: String, val day: String, val month: String, val 
duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val 
poutcome: String, val y: String)

class ParseFunction extends TableFunction[Person] {
  def eval(line: String) {
val tokens = line.split(";")
// parse the line
if (!line.startsWith("\"age\"")) {
  collect(new Person(new Integer(tokens(0).toInt), normalize(tokens(1)), 
normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), 
normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), 
normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new 
Integer(tokens(11).toInt),  new Integer(tokens(12).toInt),  
   new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),  
normalize(tokens(15)), normalize(tokens(16
}
  }
  
  override def getResultType() = {
val cls = classOf[Person]
new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
   new PojoField(cls.getDeclaredField("age"), Types.INT),
   new PojoField(cls.getDeclaredField("job"), Types.STRING),
   new PojoField(cls.getDeclaredField("marital"), Types.STRING),
   new PojoField(cls.getDeclaredField("education"), Types.STRING),
   new PojoField(cls.getDeclaredField("default"), Types.STRING),
   new PojoField(cls.getDeclaredField("balance"), Types.STRING), 
   new PojoField(cls.getDeclaredField("housing"), Types.STRING), 
   new PojoField(cls.getDeclaredField("loan"), Types.STRING), 
   new PojoField(cls.getDeclaredField("contact"), Types.STRING), 
   new PojoField(cls.getDeclaredField("day"), Types.STRING), 
   new PojoField(cls.getDeclaredField("month"), Types.STRING), 
   new PojoField(cls.getDeclaredField("duration"), Types.INT),
   new PojoField(cls.getDeclaredField("campaign"), Types.INT),
   new PojoField(cls.getDeclaredField("pdays"), Types.INT),
   new PojoField(cls.getDeclaredField("previous"), Types.INT),
   new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
   new PojoField(cls.getDeclaredField("y"), Types.STRING)
 ))
  }  

  // remove the quote
  private def normalize(token: String) = {
  if (token.startsWith("\"")) {
  token.substring(1, token.length - 1)
  } else {
  token
  }
  }
}{code}

And then I use this udf in sql but get the wrong result because flink reorder 
the fields implicitly.

 !image-2020-01-13-16-02-57-949.png! 



  was:
I don't know why flink would do that, but this cause my user defined function 
behavior incorrectly if I and pojo in my udf and override getResultType

[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]

 

Here's the udf I define.

 
{code:java}

%flink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils._
import org.apache.flink.api.scala._

class Person(val age:Int, val job: String, val marital: String, val education: 
String, val default: String, val balance: String, val housing: String, val 
loan: String, val contact: String, val day: String, val month: String, val 
duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val 
poutcome: String, val y: String)

class ParseFunction extends TableFunction[Person] {
  def eval(line: String) {
val tokens = line.split(";")
// parse the line
if (!line.startsWith("\"age\"")) {
  collect(new Person(new Integer(tokens(0).toInt), normalize(tokens(1)), 
normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), 
normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), 
normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new 
Integer(tokens(11).toInt),  new Integer(tokens(12).toInt),  
   new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),  
normalize(tokens(15)), normalize(tokens(16
}
  }
  
  override def 

[jira] [Updated] (FLINK-15566) Flink implicitly order the fields in PojoTypeInfo

2020-01-13 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15566:
---
Description: 
I don't know why flink would do that, but this cause my user defined function 
behavior incorrectly if I and pojo in my udf and override getResultType

[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]

 

Here's the udf I define.

 
{code:java}

%flink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils._
import org.apache.flink.api.scala._

class Person(val age:Int, val job: String, val marital: String, val education: 
String, val default: String, val balance: String, val housing: String, val 
loan: String, val contact: String, val day: String, val month: String, val 
duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val 
poutcome: String, val y: String)

class ParseFunction extends TableFunction[Person] {
  def eval(line: String) {
val tokens = line.split(";")
// parse the line
if (!line.startsWith("\"age\"")) {
  collect(new Person(new Integer(tokens(0).toInt), normalize(tokens(1)), 
normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), 
normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), 
normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new 
Integer(tokens(11).toInt),  new Integer(tokens(12).toInt),  
   new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),  
normalize(tokens(15)), normalize(tokens(16
}
  }
  
  override def getResultType() = {
val cls = classOf[Person]
new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
   new PojoField(cls.getDeclaredField("age"), Types.INT),
   new PojoField(cls.getDeclaredField("job"), Types.STRING),
   new PojoField(cls.getDeclaredField("marital"), Types.STRING),
   new PojoField(cls.getDeclaredField("education"), Types.STRING),
   new PojoField(cls.getDeclaredField("default"), Types.STRING),
   new PojoField(cls.getDeclaredField("balance"), Types.STRING), 
   new PojoField(cls.getDeclaredField("housing"), Types.STRING), 
   new PojoField(cls.getDeclaredField("loan"), Types.STRING), 
   new PojoField(cls.getDeclaredField("contact"), Types.STRING), 
   new PojoField(cls.getDeclaredField("day"), Types.STRING), 
   new PojoField(cls.getDeclaredField("month"), Types.STRING), 
   new PojoField(cls.getDeclaredField("duration"), Types.INT),
   new PojoField(cls.getDeclaredField("campaign"), Types.INT),
   new PojoField(cls.getDeclaredField("pdays"), Types.INT),
   new PojoField(cls.getDeclaredField("previous"), Types.INT),
   new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
   new PojoField(cls.getDeclaredField("y"), Types.STRING)
 ))
  }  

  // remove the quote
  private def normalize(token: String) = {
  if (token.startsWith("\"")) {
  token.substring(1, token.length - 1)
  } else {
  token
  }
  }
}{code}

And then I use this udf in sql but get the wrong result because the flink 
reorder the fields implicitly.

 !image-2020-01-13-16-02-57-949.png! 



  was:
I don't know why flink would do that, but this cause my user defined function 
behavior incorrectly if I and pojo in my udf and override getResultType

[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]

 

Here's the udf I define.

 
{code:java}

%flink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils._
import org.apache.flink.api.scala._

class Person(val age:Int, val job: String, val marital: String, val education: 
String, val default: String, val balance: String, val housing: String, val 
loan: String, val contact: String, val day: String, val month: String, val 
duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val 
poutcome: String, val y: String)

class ParseFunction extends TableFunction[Person] {
  def eval(line: String) {
val tokens = line.split(";")
// parse the line
if (!line.startsWith("\"age\"")) {
  collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)), 
normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), 
normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), 
normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new 
Integer(tokens(11).toInt),  new Integer(tokens(12).toInt),  
   new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),  
normalize(tokens(15)), normalize(tokens(16
}
  }
  
  override def 

[jira] [Commented] (FLINK-15566) Flink implicitly order the fields in PojoTypeInfo

2020-01-13 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014100#comment-17014100
 ] 

Jeff Zhang commented on FLINK-15566:


[~aljoscha] Could you take a look at this issue ? Seems you are the original 
author of that piece of code.

> Flink implicitly order the fields in PojoTypeInfo
> -
>
> Key: FLINK-15566
> URL: https://issues.apache.org/jira/browse/FLINK-15566
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
> Attachments: image-2020-01-13-16-02-57-949.png
>
>
> I don't know why flink would do that, but this cause my user defined function 
> behavior incorrectly if I and pojo in my udf and override getResultType
> [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]
>  
> Here's the udf I define.
>  
> {code:java}
> %flink
> import org.apache.flink.api.java.typeutils.RowTypeInfo
> import org.apache.flink.api.common.typeinfo.Types
> import org.apache.flink.api.java.typeutils._
> import org.apache.flink.api.scala.typeutils._
> import org.apache.flink.api.scala._
> class Person(val age:Int, val job: String, val marital: String, val 
> education: String, val default: String, val balance: String, val housing: 
> String, val loan: String, val contact: String, val day: String, val month: 
> String, val duration: Int, val campaign: Int, val pdays: Int, val previous: 
> Int, val poutcome: String, val y: String)
> class ParseFunction extends TableFunction[Person] {
>   def eval(line: String) {
> val tokens = line.split(";")
> // parse the line
> if (!line.startsWith("\"age\"")) {
>   collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)), 
> normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), 
> normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), 
> normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new 
> Integer(tokens(11).toInt),  new Integer(tokens(12).toInt),  
>new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),  
> normalize(tokens(15)), normalize(tokens(16
> }
>   }
>   
>   override def getResultType() = {
> val cls = classOf[Person]
> new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
>new PojoField(cls.getDeclaredField("age"), Types.INT),
>new PojoField(cls.getDeclaredField("job"), Types.STRING),
>new PojoField(cls.getDeclaredField("marital"), Types.STRING),
>new PojoField(cls.getDeclaredField("education"), Types.STRING),
>new PojoField(cls.getDeclaredField("default"), Types.STRING),
>new PojoField(cls.getDeclaredField("balance"), Types.STRING), 
>new PojoField(cls.getDeclaredField("housing"), Types.STRING), 
>new PojoField(cls.getDeclaredField("loan"), Types.STRING), 
>new PojoField(cls.getDeclaredField("contact"), Types.STRING), 
>new PojoField(cls.getDeclaredField("day"), Types.STRING), 
>new PojoField(cls.getDeclaredField("month"), Types.STRING), 
>new PojoField(cls.getDeclaredField("duration"), Types.INT),
>new PojoField(cls.getDeclaredField("campaign"), Types.INT),
>new PojoField(cls.getDeclaredField("pdays"), Types.INT),
>new PojoField(cls.getDeclaredField("previous"), Types.INT),
>new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
>new PojoField(cls.getDeclaredField("y"), Types.STRING)
>  ))
>   }  
>   // remove the quote
>   private def normalize(token: String) = {
>   if (token.startsWith("\"")) {
>   token.substring(1, token.length - 1)
>   } else {
>   token
>   }
>   }
> }{code}
> And then I use this udf in sql but get the wrong result because the flink 
> reorder the fields implicitly.
>  !image-2020-01-13-16-02-57-949.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15566) Flink implicitly order the fields in PojoTypeInfo

2020-01-13 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15566:
--

 Summary: Flink implicitly order the fields in PojoTypeInfo
 Key: FLINK-15566
 URL: https://issues.apache.org/jira/browse/FLINK-15566
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Affects Versions: 1.10.0
Reporter: Jeff Zhang
 Attachments: image-2020-01-13-16-02-57-949.png

I don't know why flink would do that, but this cause my user defined function 
behavior incorrectly if I and pojo in my udf and override getResultType

[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]

 

Here's the udf I define.

 
{code:java}

%flink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils._
import org.apache.flink.api.scala._

class Person(val age:Int, val job: String, val marital: String, val education: 
String, val default: String, val balance: String, val housing: String, val 
loan: String, val contact: String, val day: String, val month: String, val 
duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val 
poutcome: String, val y: String)

class ParseFunction extends TableFunction[Person] {
  def eval(line: String) {
val tokens = line.split(";")
// parse the line
if (!line.startsWith("\"age\"")) {
  collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)), 
normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), 
normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), 
normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new 
Integer(tokens(11).toInt),  new Integer(tokens(12).toInt),  
   new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),  
normalize(tokens(15)), normalize(tokens(16
}
  }
  
  override def getResultType() = {
val cls = classOf[Person]
new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
   new PojoField(cls.getDeclaredField("age"), Types.INT),
   new PojoField(cls.getDeclaredField("job"), Types.STRING),
   new PojoField(cls.getDeclaredField("marital"), Types.STRING),
   new PojoField(cls.getDeclaredField("education"), Types.STRING),
   new PojoField(cls.getDeclaredField("default"), Types.STRING),
   new PojoField(cls.getDeclaredField("balance"), Types.STRING), 
   new PojoField(cls.getDeclaredField("housing"), Types.STRING), 
   new PojoField(cls.getDeclaredField("loan"), Types.STRING), 
   new PojoField(cls.getDeclaredField("contact"), Types.STRING), 
   new PojoField(cls.getDeclaredField("day"), Types.STRING), 
   new PojoField(cls.getDeclaredField("month"), Types.STRING), 
   new PojoField(cls.getDeclaredField("duration"), Types.INT),
   new PojoField(cls.getDeclaredField("campaign"), Types.INT),
   new PojoField(cls.getDeclaredField("pdays"), Types.INT),
   new PojoField(cls.getDeclaredField("previous"), Types.INT),
   new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
   new PojoField(cls.getDeclaredField("y"), Types.STRING)
 ))
  }  

  // remove the quote
  private def normalize(token: String) = {
  if (token.startsWith("\"")) {
  token.substring(1, token.length - 1)
  } else {
  token
  }
  }
}{code}

And then I use this udf in sql but get the wrong result because the flink 
reorder the fields implicitly.

 !image-2020-01-13-16-02-57-949.png! 





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15562) Unable to create walk through project

2020-01-12 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15562:
---
Component/s: Documentation

> Unable to create walk through project
> -
>
> Key: FLINK-15562
> URL: https://issues.apache.org/jira/browse/FLINK-15562
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> I try to follow the instruction here to create flink walk though project, but 
> hit the following errors.
> [https://ci.apache.org/projects/flink/flink-docs-master/getting-started/walkthroughs/table_api.html]
> {code:java}
> ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-archetype-plugin:3.0.1:generate (default-cli) 
> on project standalone-pom: archetypeCatalog 
> 'https://repository.apache.org/content/repositories/snapshots/' is not 
> supported anymore. Please read the plugin documentation for details. -> [Help 
> 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
> switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions, please 
> read the following articles:
> [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15562) Unable to create walk through project

2020-01-12 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15562:
--

 Summary: Unable to create walk through project
 Key: FLINK-15562
 URL: https://issues.apache.org/jira/browse/FLINK-15562
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.10.0
Reporter: Jeff Zhang


I try to follow the instruction here to create flink walk though project, but 
hit the following errors.

[https://ci.apache.org/projects/flink/flink-docs-master/getting-started/walkthroughs/table_api.html]
{code:java}
ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-archetype-plugin:3.0.1:generate (default-cli) on 
project standalone-pom: archetypeCatalog 
'https://repository.apache.org/content/repositories/snapshots/' is not 
supported anymore. Please read the plugin documentation for details. -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15384) Expose JobListener API on TableEnvironment

2020-01-06 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15384:
---
Summary: Expose JobListener API on TableEnvironment  (was: Allow to pass 
ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment )

> Expose JobListener API on TableEnvironment
> --
>
> Key: FLINK-15384
> URL: https://issues.apache.org/jira/browse/FLINK-15384
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> In the new approach of creating TableEnvironement, it is not possible to pass 
> ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This 
> cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment 
> is not available in table api. Such as add JobListener, execute Job async. So 
> I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment  
> {code}
> EnvironmentSettings bbSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15439) Incorrect SQL Document about DDL support

2019-12-30 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15439:
--

 Summary: Incorrect SQL Document about DDL support
 Key: FLINK-15439
 URL: https://issues.apache.org/jira/browse/FLINK-15439
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.10.0
Reporter: Jeff Zhang
 Attachments: image-2019-12-31-09-45-02-044.png

 !image-2019-12-31-09-45-02-044.png! 

DDL is supported now, document should be updated. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15395) No API for execute insert into statement

2019-12-25 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003500#comment-17003500
 ] 

Jeff Zhang commented on FLINK-15395:


[~ykt836] Here's how sql-client execute `insert into`. This is too complex. 
(Besides calling sqlUpdate, I also need to other complex stuff)

https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572


> No API for execute insert into statement 
> -
>
> Key: FLINK-15395
> URL: https://issues.apache.org/jira/browse/FLINK-15395
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> IIUC, TableEnv#sqlUpdate is used for DDL & DML while TableEnv#sqlQuery is 
> used for select statement. Unfortunately, it seems `insert into` is a special 
> case that no simple api can be used for it. 
> The code of implementing `insert into` in sql-client is pretty complex, it 
> would be nice to have one simple api for executing `insert into`
> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15395) No API for execute insert into statement

2019-12-25 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003497#comment-17003497
 ] 

Jeff Zhang commented on FLINK-15395:


Thanks [~jark] [~ykt836], `sqlUpdate` is the best option, unfortunately 
currently it could not be done in this way. 

> No API for execute insert into statement 
> -
>
> Key: FLINK-15395
> URL: https://issues.apache.org/jira/browse/FLINK-15395
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> IIUC, TableEnv#sqlUpdate is used for DDL & DML while TableEnv#sqlQuery is 
> used for select statement. Unfortunately, it seems `insert into` is a special 
> case that no simple api can be used for it. 
> The code of implementing `insert into` in sql-client is pretty complex, it 
> would be nice to have one simple api for executing `insert into`
> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15395) No API for execute insert into statement

2019-12-25 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15395:
---
Description: 
IIUC, TableEnv#sqlUpdate is used for DDL & DML while TableEnv#sqlQuery is used 
for select statement. Unfortunately, it seems `insert into` is a special case 
that no simple api can be used for it. 
The code of implementing `insert into` in sql-client is pretty complex, it 
would be nice to have one simple api for executing `insert into`
https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572


  was:
IIUC, TableEnv#sqlUpdate is used for DDL while TableEnv#sqlQuery is used for 
DML. Unfortunately, it seems `insert into` is a special case that no simple api 
can be used for it. 
The code of implementing `insert into` in sql-client is pretty complex, it 
would be nice to have one simple api for executing `insert into`
https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572



> No API for execute insert into statement 
> -
>
> Key: FLINK-15395
> URL: https://issues.apache.org/jira/browse/FLINK-15395
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> IIUC, TableEnv#sqlUpdate is used for DDL & DML while TableEnv#sqlQuery is 
> used for select statement. Unfortunately, it seems `insert into` is a special 
> case that no simple api can be used for it. 
> The code of implementing `insert into` in sql-client is pretty complex, it 
> would be nice to have one simple api for executing `insert into`
> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15395) No API for execute insert into statement

2019-12-25 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15395:
---
Issue Type: Improvement  (was: Bug)

> No API for execute insert into statement 
> -
>
> Key: FLINK-15395
> URL: https://issues.apache.org/jira/browse/FLINK-15395
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> IIUC, TableEnv#sqlUpdate is used for DDL while TableEnv#sqlQuery is used for 
> DML. Unfortunately, it seems `insert into` is a special case that no simple 
> api can be used for it. 
> The code of implementing `insert into` in sql-client is pretty complex, it 
> would be nice to have one simple api for executing `insert into`
> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15395) No API for execute insert into statement

2019-12-25 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003474#comment-17003474
 ] 

Jeff Zhang commented on FLINK-15395:


\cc [~tiwalter] [~aljoscha]

> No API for execute insert into statement 
> -
>
> Key: FLINK-15395
> URL: https://issues.apache.org/jira/browse/FLINK-15395
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> IIUC, TableEnv#sqlUpdate is used for DDL while TableEnv#sqlQuery is used for 
> DML. Unfortunately, it seems `insert into` is a special case that no simple 
> api can be used for it. 
> The code of implementing `insert into` in sql-client is pretty complex, it 
> would be nice to have one simple api for executing `insert into`
> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15395) No API for execute insert into statement

2019-12-25 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15395:
--

 Summary: No API for execute insert into statement 
 Key: FLINK-15395
 URL: https://issues.apache.org/jira/browse/FLINK-15395
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: Jeff Zhang


IIUC, TableEnv#sqlUpdate is used for DDL while TableEnv#sqlQuery is used for 
DML. Unfortunately, it seems `insert into` is a special case that no simple api 
can be used for it. 
The code of implementing `insert into` in sql-client is pretty complex, it 
would be nice to have one simple api for executing `insert into`
https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L572




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment

2019-12-24 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003080#comment-17003080
 ] 

Jeff Zhang edited comment on FLINK-15384 at 12/25/19 5:31 AM:
--

[~ykt836] I can do it for StreamTableEnvironment, but there's no such api for 
BatchTableEnvironment. 
And I guess using TableEnvironment.create is the preferred way.

BTW, the document of creating TableEnvironment is really very confusing. 
There're so many different approaches for creating different TableEnvironment 
(flink/blink planner, stream/batch)
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment



was (Author: zjffdu):
[~ykt836] I can do for StreamTableEnvironment, but there's no such api for 
BatchTableEnvironment. 
And I guess using TableEnvironment.create is the preferred way.

BTW, the document of creating TableEnvironment is really very confusing. 
There're so many different approaches for creating different TableEnvironment 
(flink/blink planner, stream/batch)
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment


> Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment 
> --
>
> Key: FLINK-15384
> URL: https://issues.apache.org/jira/browse/FLINK-15384
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> In the new approach of creating TableEnvironement, it is not possible to pass 
> ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This 
> cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment 
> is not available in table api. Such as add JobListener, execute Job async. So 
> I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment  
> {code}
> EnvironmentSettings bbSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment

2019-12-24 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003080#comment-17003080
 ] 

Jeff Zhang commented on FLINK-15384:


[~ykt836] I can do for StreamTableEnvironment, but there's no such api for 
BatchTableEnvironment. 
And I guess using TableEnvironment.create is the preferred way.

BTW, the document of creating TableEnvironment is really very confusing. 
There're so many different approaches for creating different TableEnvironment 
(flink/blink planner, stream/batch)
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment


> Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment 
> --
>
> Key: FLINK-15384
> URL: https://issues.apache.org/jira/browse/FLINK-15384
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> In the new approach of creating TableEnvironement, it is not possible to pass 
> ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This 
> cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment 
> is not available in table api. Such as add JobListener, execute Job async. So 
> I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment  
> {code}
> EnvironmentSettings bbSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment

2019-12-24 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17002847#comment-17002847
 ] 

Jeff Zhang edited comment on FLINK-15384 at 12/24/19 2:01 PM:
--

\cc [~aljoscha] [~kkloudas] [~tison] [~lzljs3620320] 


was (Author: zjffdu):
\cc [~aljoscha] [~kkloudas] [~tison]

> Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment 
> --
>
> Key: FLINK-15384
> URL: https://issues.apache.org/jira/browse/FLINK-15384
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> In the new approach of creating TableEnvironement, it is not possible to pass 
> ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This 
> cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment 
> is not available in table api. Such as add JobListener, execute Job async. So 
> I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment  
> {code}
> EnvironmentSettings bbSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment

2019-12-24 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17002847#comment-17002847
 ] 

Jeff Zhang edited comment on FLINK-15384 at 12/24/19 2:02 PM:
--

\cc [~aljoscha] [~kkloudas] [~tison] [~lzljs3620320] [~twalthr]


was (Author: zjffdu):
\cc [~aljoscha] [~kkloudas] [~tison] [~lzljs3620320] 

> Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment 
> --
>
> Key: FLINK-15384
> URL: https://issues.apache.org/jira/browse/FLINK-15384
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> In the new approach of creating TableEnvironement, it is not possible to pass 
> ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This 
> cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment 
> is not available in table api. Such as add JobListener, execute Job async. So 
> I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment  
> {code}
> EnvironmentSettings bbSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment

2019-12-24 Thread Jeff Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17002847#comment-17002847
 ] 

Jeff Zhang commented on FLINK-15384:


\cc [~aljoscha] [~kkloudas] [~tison]

> Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment 
> --
>
> Key: FLINK-15384
> URL: https://issues.apache.org/jira/browse/FLINK-15384
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> In the new approach of creating TableEnvironement, it is not possible to pass 
> ExecutionEnvironment/StreamExecutionEnvironment. This makes the all the 
> features in ExecutionEnvironment/StreamExecutionEnvironment is not available 
> in table api. Such as add JobListener, execute Job async. So I suggest to 
> allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment  
> {code}
> EnvironmentSettings bbSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment

2019-12-24 Thread Jeff Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15384:
---
Description: 
In the new approach of creating TableEnvironement, it is not possible to pass 
ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This 
cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment 
is not available in table api. Such as add JobListener, execute Job async. So I 
suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
TableEnvironment  

{code}
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
{code}


  was:
In the new approach of creating TableEnvironement, it is not possible to pass 
ExecutionEnvironment/StreamExecutionEnvironment. This makes the all the 
features in ExecutionEnvironment/StreamExecutionEnvironment is not available in 
table api. Such as add JobListener, execute Job async. So I suggest to allow to 
pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment  

{code}
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
{code}



> Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment 
> --
>
> Key: FLINK-15384
> URL: https://issues.apache.org/jira/browse/FLINK-15384
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Jeff Zhang
>Priority: Major
>
> In the new approach of creating TableEnvironement, it is not possible to pass 
> ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironement. This 
> cause the all the features in ExecutionEnvironment/StreamExecutionEnvironment 
> is not available in table api. Such as add JobListener, execute Job async. So 
> I suggest to allow to pass ExecutionEnvironment/StreamExecutionEnvironment to 
> TableEnvironment  
> {code}
> EnvironmentSettings bbSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15384) Allow to pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment

2019-12-24 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15384:
--

 Summary: Allow to pass 
ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment 
 Key: FLINK-15384
 URL: https://issues.apache.org/jira/browse/FLINK-15384
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: Jeff Zhang


In the new approach of creating TableEnvironement, it is not possible to pass 
ExecutionEnvironment/StreamExecutionEnvironment. This makes the all the 
features in ExecutionEnvironment/StreamExecutionEnvironment is not available in 
table api. Such as add JobListener, execute Job async. So I suggest to allow to 
pass ExecutionEnvironment/StreamExecutionEnvironment to TableEnvironment  

{code}
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   >