Re: Flink SQL将group聚合的数据写入到HBase表报primary keys问题

2020-03-09 文章 Jark Wu
Yes, the age in ROW(...) expression is invalid, you should extract the name
from age_name. Try the following query:

insert into resume01
select age_name,ROW(SUBSTR(age_name, 0, INSTR(age_name, '_') - 1),mobile)
from (
select CONCAT_WS('_',age,name) as age_name,sum(cast(mobile as bigint))
as mobile
from source_resume group by name,age
) as tt


On Tue, 10 Mar 2020 at 10:46,  wrote:

> 您好,感谢您的回复,按照你写的sql 我运行了一下,报age列不存在表中:
> String sql = "insert into resume01 \n" +
> "  select age_name,ROW(age,mobile)\n" +
> "from (\n" +
> " select CONCAT_WS('_',age,name) as
> age_name,sum(cast(mobile as bigint)) as mobile \n" +
> "  from source_resume group by
> CONCAT_WS('_',age,name) \n" +
> " ) as tt ";
> tableEnv.sqlUpdate(sql);
> 运行后错误如下:
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 2, column 31 to line 2, column 33: Column 'age' not found in any
> table
>
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at
> com.shiji.sdp.flink.HbaseTest.testWriterToHBase(HbaseTest.java:75)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59)
> at
> org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98)
> at
> org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79)
> at
> org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87)
> at
> org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77)
> at
> org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42)
> at
> org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88)
> at
> org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51)
> at
> org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44)
> at
> org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27)
> at
> org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37)
> at
> org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:130)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 2, column 31 to line 2, column 33: Column 'age' not found in any table
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>
>
>
> String sql = "insert into resume01 \n" +
> "  select age_name,ROW(age,mobile)\n" +
> "from (\n" +
> " select CONCAT_WS('_',age,name) as
> age_name,sum(cast(mobile as bigint)) as mobile,age as age \n" +
> "  from source_resume group by
> CONCAT_WS('_',age,name),age \n" +
> " ) as tt ";
> tableEnv.sqlUpdate(sql);
>
> 加上age后提交原来的错误:
> org.apache.flink.table.api.TableException: UpsertStreamTableSink requires
> that 

Re: Flink SQL将group聚合的数据写入到HBase表报primary keys问题

2020-03-09 文章 Jark Wu
Hi,

目前 Flink SQL 在插入数据到数据库时,要求 query 的 key 与结果表的 key 相同。这里 HBase 的 key 一直都是
rowkey,但是 query 的 key 丢失了(concat_ws 丢失了 key 属性),因此需要直接 group by
concat_ws(..),才能获得 key 且对应上 HBase 的 rowkey。所以你的 query 需要改成这样:

insert into resume01
select age_name,ROW(age,mobile)
from (
select CONCAT_WS('_',age,name) as age_name,sum(cast(mobile as bigint))
as mobile
from source_resume group by CONCAT_WS('_',age,name)
) as tt

Best,
Jark

On Mon, 9 Mar 2020 at 21:03, psyche19830...@163.com 
wrote:

> 各位好,
>   最近在研究Flink Hbase连接器,测试实验是将聚合的数据写入到hbase报错。希望能得到各位的帮助。代码 如下:
> /**
>  * @Author: ellis.guan
>  * @Description: HBase测试类
>  * @Date: 2020/3/6 15:41
>  * @Version: 1.0
>  */
> public class HbaseTest {
> private StreamExecutionEnvironment env;
> private StreamTableEnvironment tableEnv;
>
> @Before
> public void init(){
> env=StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> tableEnv = StreamTableEnvironment.create(env, settings);
> tableEnv.sqlUpdate("create table resume01(\n" +
> " `rowkey` string,sdp_columns_family ROW BIGINT> \n" +
> //" `binfo` ROW,\n" +
> //" edu ROW,  \n" +
> //" work ROW  \n" +
> ") with (" +
> " 'connector.type' = 'hbase',  " +
> " 'connector.version' = '1.4.3', " +
> " 'connector.table-name' = 'resume01'," +
> " 'connector.zookeeper.quorum' = 'localhost:2181'," +
> " 'connector.zookeeper.znode.parent' = '/hbase'" +
> ")");
> }
> @Test
> public void testReadFromHBase() throws Exception {
> //HBaseTableSource resume = new HBaseTableSource();
> Table table = tableEnv.sqlQuery("select * from resume");
> DataStream> out =
> tableEnv.toRetractStream(table, Row.class);
> out.print();
> env.execute();
> }
>
> @Test
> public void testWriterToHBase() throws Exception {
> DataStream source = env.fromElements(
> Row.of("ellis","2015-03-27","17352837822","changsha","hun
> nan","shiji"),
> Row.of("ellis","2015-03-28","17352837825","changsha1","hun
> nan","shiji"),
>
> Row.of("ellis","2015-03-279","17352837826","changsha2","hun nan","shiji"));
>
> tableEnv.createTemporaryView("source_resume",source,"name,age,mobile,site,university,company1");
> tableEnv.sqlUpdate("insert into resume01 select
> CONCAT_WS('_',age,name),ROW(age,mobile) from " +
> " (select name,age,sum(cast(mobile as bigint)) as mobile
> from source_resume group by name,age ) as tt");
> env.execute();
> }
> }
>
> 运行报错如下:
> org.apache.flink.table.api.TableException: UpsertStreamTableSink requires
> that Table has a full primary keys if it is updated.
>
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at com.shiji.sdp.flink.HbaseTest.testWriterToHBase(HbaseTest.java:59)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at 

Flink SQL将group聚合的数据写入到HBase表报primary keys问题

2020-03-09 文章 psyche19830...@163.com
各位好,
  最近在研究Flink Hbase连接器,测试实验是将聚合的数据写入到hbase报错。希望能得到各位的帮助。代码 如下:
/**
 * @Author: ellis.guan
 * @Description: HBase测试类
 * @Date: 2020/3/6 15:41
 * @Version: 1.0
 */
public class HbaseTest {
private StreamExecutionEnvironment env;
private StreamTableEnvironment tableEnv;

@Before
public void init(){
env=StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.sqlUpdate("create table resume01(\n" +
" `rowkey` string,sdp_columns_family ROW \n" +
//" `binfo` ROW,\n" +
//" edu ROW,  \n" +
//" work ROW  \n" +
") with (" +
" 'connector.type' = 'hbase',  " +
" 'connector.version' = '1.4.3', " +
" 'connector.table-name' = 'resume01'," +
" 'connector.zookeeper.quorum' = 'localhost:2181'," +
" 'connector.zookeeper.znode.parent' = '/hbase'" +
")");
}
@Test
public void testReadFromHBase() throws Exception {
//HBaseTableSource resume = new HBaseTableSource();
Table table = tableEnv.sqlQuery("select * from resume");
DataStream> out = tableEnv.toRetractStream(table, 
Row.class);
out.print();
env.execute();
}

@Test
public void testWriterToHBase() throws Exception {
DataStream source = env.fromElements(
Row.of("ellis","2015-03-27","17352837822","changsha","hun 
nan","shiji"),
Row.of("ellis","2015-03-28","17352837825","changsha1","hun 
nan","shiji"),
Row.of("ellis","2015-03-279","17352837826","changsha2","hun 
nan","shiji"));

tableEnv.createTemporaryView("source_resume",source,"name,age,mobile,site,university,company1");
tableEnv.sqlUpdate("insert into resume01 select 
CONCAT_WS('_',age,name),ROW(age,mobile) from " +
" (select name,age,sum(cast(mobile as bigint)) as mobile from 
source_resume group by name,age ) as tt");
env.execute();
}
}

运行报错如下:
org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that 
Table has a full primary keys if it is updated.

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.shiji.sdp.flink.HbaseTest.testWriterToHBase(HbaseTest.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59)
at org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98)
at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79)
at 
org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87)
at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77)
at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42)
at 
org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88)
at