[jira] [Updated] (FLINK-16068) table with keyword-escaped columns and computed_column_expression columns

2020-02-24 Thread Yu Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Li updated FLINK-16068:
--
Fix Version/s: 1.11.0

> table with keyword-escaped columns and computed_column_expression columns
> -
>
> Key: FLINK-16068
> URL: https://issues.apache.org/jira/browse/FLINK-16068
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.10.0
>Reporter: pangliang
>Assignee: Benchao Li
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> I use sql-client to create a table with keyword-escaped column and 
> computed_column_expression column, like this:
> {code:java}
> CREATE TABLE source_kafka (
> log STRING,
> `time` BIGINT,
> pt as proctime()
> ) WITH (
>   'connector.type' = 'kafka',   
>   'connector.version' = 'universal',
>   'connector.topic' = 'k8s-logs',
>   'connector.startup-mode' = 'latest-offset',
>   'connector.properties.zookeeper.connect' = 
> 'zk-1.zk:2181,zk-2.zk:2181,zk-3.zk:2181/kafka',
>   'connector.properties.bootstrap.servers' = 'kafka.default:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'format.type'='json',
>   'format.fail-on-missing-field' = 'true',
>   'update-mode' = 'append'
> );
> {code}
> Then I simply used it :
> {code:java}
> SELECT * from source_kafka limit 10;{code}
> got an exception:
> {code:java}
> java.io.IOException: Fail to run stream sql job
>   at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:164)
>   at 
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callSelect(FlinkStreamSqlInterpreter.java:108)
>   at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:203)
>   at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
>   at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:104)
>   at 
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:103)
>   at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:676)
>   at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:569)
>   at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
>   at 
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:121)
>   at 
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
> Encountered "time" at line 1, column 12.
> Was expecting one of:
> "ABS" ...
> "ARRAY" ...
> "AVG" ...
> "CARDINALITY" ...
> "CASE" ...
> "CAST" ...
> "CEIL" ...
> "CEILING" ...
> ..
> 
>   at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>   at 
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>   at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>   

[jira] [Updated] (FLINK-16068) table with keyword-escaped columns and computed_column_expression columns

2020-02-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16068:
---
Labels: pull-request-available  (was: )

> table with keyword-escaped columns and computed_column_expression columns
> -
>
> Key: FLINK-16068
> URL: https://issues.apache.org/jira/browse/FLINK-16068
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.10.0
>Reporter: pangliang
>Assignee: Benchao Li
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.1
>
>
> I use sql-client to create a table with keyword-escaped column and 
> computed_column_expression column, like this:
> {code:java}
> CREATE TABLE source_kafka (
> log STRING,
> `time` BIGINT,
> pt as proctime()
> ) WITH (
>   'connector.type' = 'kafka',   
>   'connector.version' = 'universal',
>   'connector.topic' = 'k8s-logs',
>   'connector.startup-mode' = 'latest-offset',
>   'connector.properties.zookeeper.connect' = 
> 'zk-1.zk:2181,zk-2.zk:2181,zk-3.zk:2181/kafka',
>   'connector.properties.bootstrap.servers' = 'kafka.default:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'format.type'='json',
>   'format.fail-on-missing-field' = 'true',
>   'update-mode' = 'append'
> );
> {code}
> Then I simply used it :
> {code:java}
> SELECT * from source_kafka limit 10;{code}
> got an exception:
> {code:java}
> java.io.IOException: Fail to run stream sql job
>   at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:164)
>   at 
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callSelect(FlinkStreamSqlInterpreter.java:108)
>   at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:203)
>   at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
>   at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:104)
>   at 
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:103)
>   at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:676)
>   at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:569)
>   at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
>   at 
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:121)
>   at 
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
> Encountered "time" at line 1, column 12.
> Was expecting one of:
> "ABS" ...
> "ARRAY" ...
> "AVG" ...
> "CARDINALITY" ...
> "CASE" ...
> "CAST" ...
> "CEIL" ...
> "CEILING" ...
> ..
> 
>   at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>   at 
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>   at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>   at 
> org.apache.flin

[jira] [Updated] (FLINK-16068) table with keyword-escaped columns and computed_column_expression columns

2020-02-15 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-16068:

Priority: Critical  (was: Major)

> table with keyword-escaped columns and computed_column_expression columns
> -
>
> Key: FLINK-16068
> URL: https://issues.apache.org/jira/browse/FLINK-16068
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.10.0
>Reporter: pangliang
>Priority: Critical
> Fix For: 1.10.1
>
>
> I use sql-client to create a table with keyword-escaped column and 
> computed_column_expression column, like this:
> {code:java}
> CREATE TABLE source_kafka (
> log STRING,
> `time` BIGINT,
> pt as proctime()
> ) WITH (
>   'connector.type' = 'kafka',   
>   'connector.version' = 'universal',
>   'connector.topic' = 'k8s-logs',
>   'connector.startup-mode' = 'latest-offset',
>   'connector.properties.zookeeper.connect' = 
> 'zk-1.zk:2181,zk-2.zk:2181,zk-3.zk:2181/kafka',
>   'connector.properties.bootstrap.servers' = 'kafka.default:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'format.type'='json',
>   'format.fail-on-missing-field' = 'true',
>   'update-mode' = 'append'
> );
> {code}
> Then I simply used it :
> {code:java}
> SELECT * from source_kafka limit 10;{code}
> got an exception:
> {code:java}
> java.io.IOException: Fail to run stream sql job
>   at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:164)
>   at 
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callSelect(FlinkStreamSqlInterpreter.java:108)
>   at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:203)
>   at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
>   at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:104)
>   at 
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:103)
>   at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:676)
>   at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:569)
>   at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
>   at 
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:121)
>   at 
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
> Encountered "time" at line 1, column 12.
> Was expecting one of:
> "ABS" ...
> "ARRAY" ...
> "AVG" ...
> "CARDINALITY" ...
> "CASE" ...
> "CAST" ...
> "CEIL" ...
> "CEILING" ...
> ..
> 
>   at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>   at 
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>   at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)

[jira] [Updated] (FLINK-16068) table with keyword-escaped columns and computed_column_expression columns

2020-02-14 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-16068:

Fix Version/s: 1.10.1

> table with keyword-escaped columns and computed_column_expression columns
> -
>
> Key: FLINK-16068
> URL: https://issues.apache.org/jira/browse/FLINK-16068
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.10.0
>Reporter: pangliang
>Priority: Major
> Fix For: 1.10.1
>
>
> I use sql-client to create a table with keyword-escaped column and 
> computed_column_expression column, like this:
> {code:java}
> CREATE TABLE source_kafka (
> log STRING,
> `time` BIGINT,
> pt as proctime()
> ) WITH (
>   'connector.type' = 'kafka',   
>   'connector.version' = 'universal',
>   'connector.topic' = 'k8s-logs',
>   'connector.startup-mode' = 'latest-offset',
>   'connector.properties.zookeeper.connect' = 
> 'zk-1.zk:2181,zk-2.zk:2181,zk-3.zk:2181/kafka',
>   'connector.properties.bootstrap.servers' = 'kafka.default:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'format.type'='json',
>   'format.fail-on-missing-field' = 'true',
>   'update-mode' = 'append'
> );
> {code}
> Then I simply used it :
> {code:java}
> SELECT * from source_kafka limit 10;{code}
> got an exception:
> {code:java}
> java.io.IOException: Fail to run stream sql job
>   at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:164)
>   at 
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callSelect(FlinkStreamSqlInterpreter.java:108)
>   at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:203)
>   at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
>   at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:104)
>   at 
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:103)
>   at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:676)
>   at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:569)
>   at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
>   at 
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:121)
>   at 
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
> Encountered "time" at line 1, column 12.
> Was expecting one of:
> "ABS" ...
> "ARRAY" ...
> "AVG" ...
> "CARDINALITY" ...
> "CASE" ...
> "CAST" ...
> "CEIL" ...
> "CEILING" ...
> ..
> 
>   at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>   at 
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>   at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>   at 
> 

[jira] [Updated] (FLINK-16068) table with keyword-escaped columns and computed_column_expression columns

2020-02-14 Thread pangliang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

pangliang updated FLINK-16068:
--
Description: 
I use sql-client to create a table with keyword-escaped column and 
computed_column_expression column, like this:
{code:java}
CREATE TABLE source_kafka (
log STRING,
`time` BIGINT,
pt as proctime()
) WITH (
  'connector.type' = 'kafka',   
  'connector.version' = 'universal',
  'connector.topic' = 'k8s-logs',
  'connector.startup-mode' = 'latest-offset',
  'connector.properties.zookeeper.connect' = 
'zk-1.zk:2181,zk-2.zk:2181,zk-3.zk:2181/kafka',
  'connector.properties.bootstrap.servers' = 'kafka.default:9092',
  'connector.properties.group.id' = 'testGroup',
  'format.type'='json',
  'format.fail-on-missing-field' = 'true',
  'update-mode' = 'append'
);
{code}
Then I simply used it :
{code:java}
SELECT * from source_kafka limit 10;{code}
got an exception:
{code:java}
java.io.IOException: Fail to run stream sql job
at 
org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:164)
at 
org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callSelect(FlinkStreamSqlInterpreter.java:108)
at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:203)
at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:104)
at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:103)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:676)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:569)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:121)
at 
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
Encountered "time" at line 1, column 12.
Was expecting one of:
"ABS" ...
"ARRAY" ...
"AVG" ...
"CARDINALITY" ...
"CASE" ...
"CAST" ...
"CEIL" ...
"CEILING" ...
..

at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
at 
org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at 
org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:104)
... 13 more
{code}
I also did some tests, the following can run:
{code:java}
CREATE TABLE source_kafka (
log STRING,
`a` BIGINT,
pt as proctime()
)

CREATE TABLE source_kafka (
log STRING,