从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

请问是不支持pojo流注册表吗?只能是Row类型吗?

下面是相关代码



        //1.创建执行环境
        StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();

        //全局参数设置
        streamEnv.getConfig().setGlobalJobParameters(parameters2);

        //table env
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv);


        //2.读取kafka | other source

//        DataStream<String> dataStream = null;

//        if ("kafka".equalsIgnoreCase(sourceType)) {

            //用jsonString反序列化

//            dataStream = FlinkUtils.createKafkaStream(parameters2, 
SimpleStringSchema.class);

//        }

        //###############定义消费kafka source##############
        Properties props = new Properties();
        //指定Ka fka的Broker地址
        props.setProperty("bootstrap.servers", 
parameters2.getRequired("bootstrap.servers"));
        //指定组ID
        props.setProperty("group.id", parameters2.get("group.id"));
        //如果没有记录偏移量,第一次从最开始消费
//        props.setProperty("auto.offset.reset", 
parameters.get("auto.offset.reset","earliest"));
        //kafka的消费者不自动提交偏移量
        props.setProperty("enable.auto.commit", 
parameters2.get("enable.auto.commit","false"));

        List<String> topics = 
Arrays.asList(parameters2.get("topics").split(","));



        //new KafkaSource instance
        FlinkKafkaConsumer<String> kafkaConsumer = new 
FlinkKafkaConsumer<String>(
                topics,
                SimpleStringSchema.class.newInstance(),
                props);

        //得到kafka流
        DataStreamSource<String> dataStream = 
streamEnv.addSource(kafkaConsumer);

        //3.映射为实体
        SingleOutputStreamOperator map = dataStream.map(new 
Map2EntityFunction()).returns(Class.forName(sourceClass));

        //4.注册一个实例获取column names
        Class<?> clz = Class.forName(sourceClass);
        Object vo = clz.newInstance();
        StringBuilder columnBuilder = new StringBuilder();
        Field[] declaredFields = vo.getClass().getDeclaredFields();
        for (int i = 0; i < declaredFields.length; i++) {
            String fieldName = declaredFields[i].getName();
            columnBuilder.append(fieldName);
            if (i < declaredFields.length - 1) {
                columnBuilder.append(",");
            }
        }

        String fieldsDeclare = columnBuilder.toString();

        System.err.println(fieldsDeclare);

        //5.注册数据表 --注意! 【这里的表名和字段名需要和待处理的执行表达式对应上,对应不上查询会报错】
        tEnv.registerDataStream(sourceName, map,fieldsDeclare);

        //6.执行语句
        Table table = tEnv.sqlQuery(executiveSql);

        //7.print
        tEnv.toAppendStream(table, Row.class).print();//运行时这里不会打印出结果

   
        //8.execute 
        streamEnv.execute(jobName);
                
                
                
                
-------------------------------------------------------------------------------------------------------------------------------
                
/**
 * 
 * 根据传入的映射类返回一个通用的POJO流
 */
public class Map2EntityFunction<T> extends RichMapFunction<String, T> {


    @Override
    public T map(String s) throws Exception {
        System.err.println("receive kafka msg--->"+s); //每次收到消息这里会打印
        ParameterTool params = (ParameterTool) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        String sourceClass = params.getRequired("sourceClass");
        Preconditions.checkNotNull(sourceClass);
        Class<T> clz = (Class<T>) Class.forName(sourceClass);
        return JsonUtil.json2object(s, clz);
    }


}
                
---------------------------------------------------------------------------------------------------------------------------------------
         

回复