从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印
请问是不支持pojo流注册表吗?只能是Row类型吗?
下面是相关代码
//1.创建执行环境
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
//全局参数设置
streamEnv.getConfig().setGlobalJobParameters(parameters2);
//table env
StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv);
//2.读取kafka | other source
// DataStream<String> dataStream = null;
// if ("kafka".equalsIgnoreCase(sourceType)) {
//用jsonString反序列化
// dataStream = FlinkUtils.createKafkaStream(parameters2,
SimpleStringSchema.class);
// }
//###############定义消费kafka source##############
Properties props = new Properties();
//指定Ka fka的Broker地址
props.setProperty("bootstrap.servers",
parameters2.getRequired("bootstrap.servers"));
//指定组ID
props.setProperty("group.id", parameters2.get("group.id"));
//如果没有记录偏移量,第一次从最开始消费
// props.setProperty("auto.offset.reset",
parameters.get("auto.offset.reset","earliest"));
//kafka的消费者不自动提交偏移量
props.setProperty("enable.auto.commit",
parameters2.get("enable.auto.commit","false"));
List<String> topics =
Arrays.asList(parameters2.get("topics").split(","));
//new KafkaSource instance
FlinkKafkaConsumer<String> kafkaConsumer = new
FlinkKafkaConsumer<String>(
topics,
SimpleStringSchema.class.newInstance(),
props);
//得到kafka流
DataStreamSource<String> dataStream =
streamEnv.addSource(kafkaConsumer);
//3.映射为实体
SingleOutputStreamOperator map = dataStream.map(new
Map2EntityFunction()).returns(Class.forName(sourceClass));
//4.注册一个实例获取column names
Class<?> clz = Class.forName(sourceClass);
Object vo = clz.newInstance();
StringBuilder columnBuilder = new StringBuilder();
Field[] declaredFields = vo.getClass().getDeclaredFields();
for (int i = 0; i < declaredFields.length; i++) {
String fieldName = declaredFields[i].getName();
columnBuilder.append(fieldName);
if (i < declaredFields.length - 1) {
columnBuilder.append(",");
}
}
String fieldsDeclare = columnBuilder.toString();
System.err.println(fieldsDeclare);
//5.注册数据表 --注意! 【这里的表名和字段名需要和待处理的执行表达式对应上,对应不上查询会报错】
tEnv.registerDataStream(sourceName, map,fieldsDeclare);
//6.执行语句
Table table = tEnv.sqlQuery(executiveSql);
//7.print
tEnv.toAppendStream(table, Row.class).print();//运行时这里不会打印出结果
//8.execute
streamEnv.execute(jobName);
-------------------------------------------------------------------------------------------------------------------------------
/**
*
* 根据传入的映射类返回一个通用的POJO流
*/
public class Map2EntityFunction<T> extends RichMapFunction<String, T> {
@Override
public T map(String s) throws Exception {
System.err.println("receive kafka msg--->"+s); //每次收到消息这里会打印
ParameterTool params = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String sourceClass = params.getRequired("sourceClass");
Preconditions.checkNotNull(sourceClass);
Class<T> clz = (Class<T>) Class.forName(sourceClass);
return JsonUtil.json2object(s, clz);
}
}
---------------------------------------------------------------------------------------------------------------------------------------