各位好,
最近公司想用flink来做流式计算,在研究Flink SQL读写HBase的时候遇到一些问题,希望能从您们那里得到帮忙。
我在Hbase的默认命名空间里,创建了一个resume表,表结构如下:
我的Flink测试代码如下:
@Test
public void testReadFromHBase() throws Exception {
StreamExecutionEnvironment
env=StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
// HBaseTableSource resume = new HBaseTableSource();
tableEnv.sqlUpdate("create table resume(\n" +
" binfo ROW<>,\n" +
" edu ROW<>, \n" +
" work ROW<> \n" +
") with (" +
" 'connector.type' = 'hbase', " +
" 'connector.version' = '1.4.3', " +
" 'connector.table-name' = 'resume'," +
" 'connector.zookeeper.quorum' =
'flink01.flink.net:2181,flink02.flink:2181,flink03.flink:2181'," +
" 'connector.zookeeper.znode.parent' = '/hbase'" +
")");
Table table = tableEnv.sqlQuery("select * from resume");
DataStream<Tuple2<Boolean, Row>> out = tableEnv.toRetractStream(table,
Row.class);
out.print();
env.execute();
}
运行报下面的错误:
org.apache.flink.table.api.ValidationException: Could not map binfo column to
the underlying physical type root
. No such field.
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$null$7(TypeMappingUtils.java:223)
at java.util.OptionalInt.orElseThrow(OptionalInt.java:189)
[email protected]