我知道了,我的查询sql条件的问题,已经改好了。
谢谢

发自我的iPhone

> 在 2020年8月25日,16:12,yang zhang <[email protected]> 写道:
> 
> 从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印
> 
> 请问是不支持pojo流注册表吗?只能是Row类型吗?
> 
> 下面是相关代码
> 
> 
> 
>        //1.创建执行环境
>        StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>        //全局参数设置
>        streamEnv.getConfig().setGlobalJobParameters(parameters2);
> 
>        //table env
>        StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv);
> 
> 
>        //2.读取kafka | other source
> 
> //        DataStream<String> dataStream = null;
> 
> //        if ("kafka".equalsIgnoreCase(sourceType)) {
> 
>            //用jsonString反序列化
> 
> //            dataStream = FlinkUtils.createKafkaStream(parameters2, 
> SimpleStringSchema.class);
> 
> //        }
> 
>        //###############定义消费kafka source##############
>        Properties props = new Properties();
>        //指定Ka fka的Broker地址
>        props.setProperty("bootstrap.servers", 
> parameters2.getRequired("bootstrap.servers"));
>        //指定组ID
>        props.setProperty("group.id", parameters2.get("group.id"));
>        //如果没有记录偏移量,第一次从最开始消费
> //        props.setProperty("auto.offset.reset", 
> parameters.get("auto.offset.reset","earliest"));
>        //kafka的消费者不自动提交偏移量
>        props.setProperty("enable.auto.commit", 
> parameters2.get("enable.auto.commit","false"));
> 
>        List<String> topics = 
> Arrays.asList(parameters2.get("topics").split(","));
> 
> 
> 
>        //new KafkaSource instance
>        FlinkKafkaConsumer<String> kafkaConsumer = new 
> FlinkKafkaConsumer<String>(
>                topics,
>                SimpleStringSchema.class.newInstance(),
>                props);
> 
>        //得到kafka流
>        DataStreamSource<String> dataStream = 
> streamEnv.addSource(kafkaConsumer);
> 
>        //3.映射为实体
>        SingleOutputStreamOperator map = dataStream.map(new 
> Map2EntityFunction()).returns(Class.forName(sourceClass));
> 
>        //4.注册一个实例获取column names
>        Class<?> clz = Class.forName(sourceClass);
>        Object vo = clz.newInstance();
>        StringBuilder columnBuilder = new StringBuilder();
>        Field[] declaredFields = vo.getClass().getDeclaredFields();
>        for (int i = 0; i < declaredFields.length; i++) {
>            String fieldName = declaredFields[i].getName();
>            columnBuilder.append(fieldName);
>            if (i < declaredFields.length - 1) {
>                columnBuilder.append(",");
>            }
>        }
> 
>        String fieldsDeclare = columnBuilder.toString();
> 
>        System.err.println(fieldsDeclare);
> 
>        //5.注册数据表 --注意! 【这里的表名和字段名需要和待处理的执行表达式对应上,对应不上查询会报错】
>        tEnv.registerDataStream(sourceName, map,fieldsDeclare);
> 
>        //6.执行语句
>        Table table = tEnv.sqlQuery(executiveSql);
> 
>        //7.print
>        tEnv.toAppendStream(table, Row.class).print();//运行时这里不会打印出结果
> 
> 
>        //8.execute 
>        streamEnv.execute(jobName);
>        
>        
>        
>        
> -------------------------------------------------------------------------------------------------------------------------------
>        
> /**
> * 
> * 根据传入的映射类返回一个通用的POJO流
> */
> public class Map2EntityFunction<T> extends RichMapFunction<String, T> {
> 
> 
>    @Override
>    public T map(String s) throws Exception {
>        System.err.println("receive kafka msg--->"+s); //每次收到消息这里会打印
>        ParameterTool params = (ParameterTool) 
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>        String sourceClass = params.getRequired("sourceClass");
>        Preconditions.checkNotNull(sourceClass);
>        Class<T> clz = (Class<T>) Class.forName(sourceClass);
>        return JsonUtil.json2object(s, clz);
>    }
> 
> 
> }
>        
> ---------------------------------------------------------------------------------------------------------------------------------------
>         

回复