Hi I have encountered a problem with Flink SQL.
My code:
DataSet<MarketDataInfo> dataSet0 = env.fromCollection( infos0 );
tableEnv.registerDataSet( "table0", dataSet0 );
String sql = "select closePrice from table0"
Table table = tableEnv.sql( sql );
tableEnv.registerTable( tableName, table );
DataSet<Row> redyData = tableEnv.toDataSet( table, Row.class );
This works fine.
But when I change SQL to "select distinct closePrice from table0"
"tableEnv.toDataSet" throws exception:
java.lang.AssertionError: Internal error: Error occurred while applying rule
DataSetAggregateRule
at org.apache.calcite.util.Util.newInternal(Util.java:792)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.
java:149)
at
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)
at
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:118)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java
:214)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.ja
va:825)
at
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)
at
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnviron
ment.scala:253)
at
org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEn
vironment.scala:146)
at
com.streamingedge.marketreport.analytics.flink.FlinkDataSetAnalytics.analize
(FlinkDataSetAnalytics.java:96)
at
com.streamingedge.marketreport.webserver.AnalyticsServlet.processRequest(Ana
lyticsServlet.java:117)
at
com.streamingedge.marketreport.webserver.AnalyticsServlet.doPost(AnalyticsSe
rvlet.java:40)
at
com.streamingedge.marketreport.webserver.AnalyticsServlet.doGet(AnalyticsSer
vlet.java:35)
at
javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at
javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
at
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
at
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
at
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java
:229)
at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java
:1086)
at
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:427)
at
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:
193)
at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:
1020)
at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135
)
at
org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHand
lerCollection.java:255)
at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:1
16)
at org.eclipse.jetty.server.Server.handle(Server.java:366)
at
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpCo
nnection.java:494)
at
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpC
onnection.java:973)
at
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplet
e(AbstractHttpConnection.java:1035)
at
org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:641)
at
org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:231)
at
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java
:82)
at
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.
java:696)
at
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.j
ava:53)
at
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:
608)
at
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:5
43)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.api.table.TableException: Unsupported data type
encountered
at
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRo
wSize$2.apply(DataSetRel.scala:65)
at
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRo
wSize$2.apply(DataSetRel.scala:53)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:5
1)
at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scal
a:60)
at
scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47)
at
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.estimateRowSi
ze(DataSetRel.scala:53)
at
org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.estimateRowSi
ze(DataSetAggregate.scala:38)
at
org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.computeSelfCo
st(DataSetAggregate.scala:80)
at
org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulative
Cost(RelMdPercentageOriginalRows.java:162)
at
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
Source)
at
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
Source)
at
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMet
adataQuery.java:258)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1
134)
at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubs
et.java:336)
at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubse
t.java:319)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.ja
va:1838)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.j
ava:1774)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:
1038)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlann
er.java:1058)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlann
er.java:1950)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.
java:137)
... 35 more