Yes, the age in ROW(...) expression is invalid, you should extract the name
from age_name. Try the following query:

insert into resume01
select age_name,ROW(SUBSTR(age_name, 0, INSTR(age_name, '_') - 1),mobile)
from (
    select CONCAT_WS('_',age,name) as age_name,sum(cast(mobile as bigint))
as mobile
    from source_resume group by name,age
) as tt


On Tue, 10 Mar 2020 at 10:46, <psyche19830...@163.com> wrote:

> 您好,感谢您的回复,按照你写的sql 我运行了一下,报age列不存在表中:
> String sql = "insert into resume01 \n" +
>                 "          select age_name,ROW(age,mobile)\n" +
>                 "            from (\n" +
>                 "                     select CONCAT_WS('_',age,name) as
> age_name,sum(cast(mobile as bigint)) as mobile \n" +
>                 "                          from source_resume group by
> CONCAT_WS('_',age,name) \n" +
>                 "             ) as tt ";
>         tableEnv.sqlUpdate(sql);
> 运行后错误如下:
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 2, column 31 to line 2, column 33: Column 'age' not found in any
> table
>
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
>         at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>         at
> com.shiji.sdp.flink.HbaseTest.testWriterToHBase(HbaseTest.java:75)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59)
>         at
> org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98)
>         at
> org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79)
>         at
> org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87)
>         at
> org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77)
>         at
> org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42)
>         at
> org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88)
>         at
> org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51)
>         at
> org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44)
>         at
> org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27)
>         at
> org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37)
>         at
> org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:130)
>         at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>         at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>         at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>         at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 2, column 31 to line 2, column 33: Column 'age' not found in any table
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>
>
>
> String sql = "insert into resume01 \n" +
>                 "          select age_name,ROW(age,mobile)\n" +
>                 "            from (\n" +
>                 "                     select CONCAT_WS('_',age,name) as
> age_name,sum(cast(mobile as bigint)) as mobile,age as age \n" +
>                 "                          from source_resume group by
> CONCAT_WS('_',age,name),age \n" +
>                 "             ) as tt ";
>         tableEnv.sqlUpdate(sql);
>
> 加上age后提交原来的错误:
> org.apache.flink.table.api.TableException: UpsertStreamTableSink requires
> that Table has a full primary keys if it is updated.
>
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>         at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>         at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>         at
> com.shiji.sdp.flink.HbaseTest.testWriterToHBase(HbaseTest.java:75)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59)
>         at
> org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98)
>         at
> org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79)
>         at
> org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87)
>         at
> org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77)
>         at
> org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42)
>         at
> org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88)
>         at
> org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51)
>         at
> org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44)
>         at
> org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27)
>         at
> org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37)
>         at
> org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:130)
>         at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>         at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>         at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>         at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
>
> _____________________________________
> Sent from http://apache-flink.147419.n8.nabble.com
>
>

回复