附件是两份主要代码












--

Best Wishes
   Galen.K





在 2020-04-07 12:11:07,"酷酷的浑蛋" <apach...@163.com> 写道:
>是不是代码中设置了从头消费,还有可能提交offset到kafka的代码中设置了false?因为你的代码应该不是全的,所以没法具体看
>
>
>| |
>apache22
>|
>|
>apach...@163.com
>|
>签名由网易邮箱大师定制
>在2020年4月7日 12:03,苟刚<gougang_1...@163.com> 写道:
>
>
>
>latest 不是最后消费的位置吗?
>另外我一直不明白的是,如果我不新增新的算子,从savepoint启动是没有问题的。不会从头开始消费,之后新增算子后才会出现这个情况。
>
>
>
>
>
>
>
>
>
>
>--
>
>Best Wishes
>Galen.K
>
>
>
>
>
>在 2020-04-07 11:39:03,"sunfulin" <sunfulin0...@163.com> 写道:
>
>
>
>Hi,
>props.put("auto.offset.reset", "latest");
>是加了这个设置导致的吧
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-04-07 11:27:53,"苟刚" <gougang_1...@163.com> 写道:
>Hello,
>我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
>我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
>
>
>flink版本:1.6.3
>
>部分代码如下:
>
>public static void main(String[] args) throws Exception {
>final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
>StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
>
>DataStreamSource<XlogStreamBasicBean> data = KafkaTools.buildSource(env);
>// 处理timing数据
>processTimingData(parameterTool, data);
>// 处理front error数据
>processFrontErrorData(parameterTool, data);
>// 处理img error数据
>processImgLoadErrorData(parameterTool, data);
>env.execute("xlog compute");
>}
>
>
>
>
>kafka的连接参数配置:
>public static Properties buildKafkaProps(ParameterTool parameterTool) {
>Properties props = parameterTool.getProperties();
>props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, 
>DEFAULT_KAFKA_BROKERS));
>props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, 
>DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
>props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, 
>DEFAULT_KAFKA_GROUP_ID));
>props.put("key.deserializer", 
>"org.apache.kafka.common.serialization.StringDeserializer");
>props.put("value.deserializer", 
>"org.apache.kafka.common.serialization.StringDeserializer");
>props.put("auto.offset.reset", "latest");
>return props;
>}
>
>
>
>
>
>
>
>--
>
>Best Wishes
>Galen.K
>
public class StreamFromKafkaV2 {

    static Logger logger = LoggerFactory.getLogger(StreamFromKafkaV2.class);

    public static void main(String[] args) throws Exception {
        final ParameterTool parameterTool = 
ExecutionEnvUtil.createParameterTool(args);
        StreamExecutionEnvironment env = 
ExecutionEnvUtil.prepare(parameterTool);

        DataStreamSource<XlogStreamBasicBean> data = 
KafkaTools.buildSource(env);
        // 处理timing数据
        processTimingData(parameterTool, data);
        // 处理front error数据
        processFrontErrorData(parameterTool, data);
        // 处理img error数据
        processImgLoadErrorData(parameterTool, data);
        env.execute("xlog compute");
    }

    /**
     * 处理图片加载失败的信息
     *
     * @param parameterTool
     * @param data
     * @throws Exception
     */
    private static void processImgLoadErrorData(ParameterTool parameterTool, 
DataStreamSource<XlogStreamBasicBean> data) throws Exception {
        // 获取过滤后的IMG_LOAD_ERROR数据
        SingleOutputStreamOperator<XlogStreamBasicBean> frontErrorStream = data
                .filter(filter -> DIY.equalsIgnoreCase(filter.getSubType()))
                
.name("过滤IMG_LOAD_ERROR数据").uid("filter_img_laod_error_data");

        // 计算各个页面的IMG错误数量
        frontErrorStream
                .map(new MapFunction<XlogStreamBasicBean, ConvertBean>() {
                    @Override
                    public ConvertBean map(XlogStreamBasicBean 
xlogStreamBasicBean) throws Exception {
                        return new ConvertBean(xlogStreamBasicBean.getPage() + 
"|" + xlogStreamBasicBean.getKey(), 1L);
                    }
                
}).name("IMG-ERROR数据类型转换").uid("map_img_load_error_data")
                .keyBy(event -> event.page)
                .timeWindow(Time.minutes(1))
                .reduce((ReduceFunction<ConvertBean>) (value1, value2) -> 
value1.add(value2.count), new WindowFunction<ConvertBean, MetricResultBean, 
String, TimeWindow>() {
                    @Override
                    public void apply(String key, TimeWindow window, 
Iterable<ConvertBean> elements, Collector<MetricResultBean> out) throws 
Exception {
                        long count = 0;
                        for (ConvertBean element : elements) {
                            count += element.count;
                        }
                        String[] pageKey = key.split("|");
                        MetricResultBean metricResultBean = 
MetricResultBean.builder()
                                .page(pageKey[0])
                                .count(count)
                                .dimension(ONE_MINUTE)
                                .name(pageKey[1])
                                .statisticsType(METRIC_TYPE_COUNT)
                                
.time(DateTimeUtil.formatDate(window.getStart())).build();
                        generateId(metricResultBean);
                        out.collect(metricResultBean);
                    }
                }).name("IMG-REDUCE过程").uid("reduce_img_load_error_count")
                
.addSink(getSinkFunction(parameterTool)).name("输出IMG-LOAD-ERROR值到kafka").uid("sink_img_load_error_to_kafka");
    }

    /**
     * 处理error的信息
     *
     * @param parameterTool
     * @param data
     * @throws Exception
     */
    private static void processFrontErrorData(ParameterTool parameterTool, 
DataStreamSource<XlogStreamBasicBean> data) throws Exception {
        // 获取过滤后的FRONT_ERROR数据
        SingleOutputStreamOperator<XlogStreamBasicBean> frontErrorStream = data
                .filter(filter -> 
FRONT_ERROR.equalsIgnoreCase(filter.getSubType()) && 
!NULL.equalsIgnoreCase(filter.getSubType()))
                .name("过滤FRONT_ERROR数据").uid("filter_front_error_data");

        // 计算各个页面的JS错误数量
        frontErrorStream
                .map(new MapFunction<XlogStreamBasicBean, ConvertBean>() {
                    @Override
                    public ConvertBean map(XlogStreamBasicBean 
xlogStreamBasicBean) throws Exception {
                        return new ConvertBean(xlogStreamBasicBean.getPage(), 
1L);
                    }
                }).name("JS数据类型转换").uid("map_js_error_data")
                .keyBy(event -> event.page)
                .timeWindow(Time.minutes(1))
                .reduce((ReduceFunction<ConvertBean>) (value1, value2) -> 
value1.add(value2.count), new WindowFunction<ConvertBean, MetricResultBean, 
String, TimeWindow>() {
                    @Override
                    public void apply(String key, TimeWindow window, 
Iterable<ConvertBean> elements, Collector<MetricResultBean> out) throws 
Exception {
                        long count = 0;
                        for (ConvertBean element : elements) {
                            count += element.count;
                        }

                        MetricResultBean metricResultBean = 
MetricResultBean.builder()
                                .page(key)
                                .count(count)
                                .dimension(ONE_MINUTE)
                                .name("jsErrorCount")
                                .statisticsType(METRIC_TYPE_COUNT)
                                
.time(DateTimeUtil.formatDate(window.getStart())).build();
                        generateId(metricResultBean);
                        out.collect(metricResultBean);
                    }
                }).name("REDUCE过程").uid("reduce_js_error_count")
                
.addSink(getSinkFunction(parameterTool)).name("输出JS-ERROR值到kafka").uid("sink_js_error_to_kafka");
    }

    /**
     * 处理timing数据源
     *
     * @param parameterTool
     * @param data
     * @throws Exception
     */
    private static void processTimingData(ParameterTool parameterTool, 
DataStreamSource<XlogStreamBasicBean> data) throws Exception {
        OutputTag<ConvertBean> pvOutputTag = new 
OutputTag<ConvertBean>("pv-output-tag") {
        };
        // 获取过滤后的Timing数据
        SingleOutputStreamOperator<XlogStreamBasicBean> timingStream = data
                .filter(filter -> TIMING.equalsIgnoreCase(filter.getType()) && 
!NULL.equalsIgnoreCase(filter.getType())).name("过滤TIMING数据").uid("filter_timing_data");

        // 计算各个页面的PV
        timingStream
                .map(new MapFunction<XlogStreamBasicBean, ConvertBean>() {
                    @Override
                    public ConvertBean map(XlogStreamBasicBean 
xlogStreamBasicBean) throws Exception {
                        return new ConvertBean(xlogStreamBasicBean.getPage(), 
1L);
                    }
                }).name("PV数据类型转换").uid("map_data_new")
                .keyBy(event -> event.page)
                .timeWindow(Time.minutes(1))
                .allowedLateness(Time.minutes(1))
                .sideOutputLateData(pvOutputTag)
                
.process(getPvWindowProcessFunction()).name("统计PV值").uid("process_result_bean_new")
                
.addSink(getSinkFunction(parameterTool)).name("输出PV到kafka").uid("sink_pv_to_kafka");

        DataStream<ConvertBean> sideOutput = 
timingStream.getSideOutput(pvOutputTag);
        sideOutput.process(new ProcessFunction<ConvertBean, MetricResultBean>() 
{
            @Override
            public void processElement(ConvertBean value, Context ctx, 
Collector<MetricResultBean> out) throws Exception {
                ctx.timerService().currentWatermark();

                MetricResultBean metricResultBean = MetricResultBean.builder()
                        .page(value.page)
                        .count(value.count)
                        .dimension(ONE_MINUTE)
                        .name("pv")
                        .statisticsType(METRIC_TYPE_COUNT)
                        .lateData(true)
                        
.time(DateTimeUtil.formatDate(ctx.timerService().currentWatermark())).build();
                generateId(metricResultBean);
                out.collect(metricResultBean);
            }
        }).name("处理PV延迟数据").uid("process_pv_late_data")
                
.addSink(getSinkFunction(parameterTool)).name("发送PV延迟数据到kafka").uid("send_pv_late_data_to_kafka");

        // 计算各个页面的UV 1分钟维度
        timingStream
                .map((MapFunction<XlogStreamBasicBean, Tuple3<String, String, 
String>>) m -> new Tuple3<>(m.getPage(), m.getCookieId(), 
ONE_MINUTE)).uid("uv_map_data" + ONE_MINUTE)
                .returns(Types.TUPLE(Types.STRING, Types.STRING, 
Types.STRING)).uid("uv-type-convert" + ONE_MINUTE)
                .keyBy(event -> event.f0)
                .timeWindow(Time.minutes(1))
                .process(getUvProcessWindowFunction(ONE_MINUTE))
                .addSink(getSinkFunction(parameterTool));

        // 计算各个页面的UV 5分钟维度
        timingStream
                .map((MapFunction<XlogStreamBasicBean, Tuple3<String, String, 
String>>) m -> new Tuple3<>(m.getPage(), m.getCookieId(), 
FIVE_MINUTE)).uid("uv_map_data" + FIVE_MINUTE)
                .returns(Types.TUPLE(Types.STRING, Types.STRING, 
Types.STRING)).uid("uv-type-convert" + FIVE_MINUTE)
                .keyBy(event -> event.f0)
                .timeWindow(Time.minutes(5))
                .process(getUvProcessWindowFunction(FIVE_MINUTE))
                .addSink(getSinkFunction(parameterTool));

        // 计算各个页面的UV 1小时维度
        timingStream
                .map((MapFunction<XlogStreamBasicBean, Tuple3<String, String, 
String>>) m -> new Tuple3<>(m.getPage(), m.getCookieId(), 
ONE_HOUR)).uid("uv_map_data" + ONE_HOUR)
                .returns(Types.TUPLE(Types.STRING, Types.STRING, 
Types.STRING)).uid("uv-type-convert" + ONE_HOUR)
                .keyBy(event -> event.f0)
                .timeWindow(Time.hours(1))
                .process(getUvProcessWindowFunction(ONE_HOUR))
                .addSink(getSinkFunction(parameterTool));

        //TODO 计算各个页面的UV 1天维度
//        timingStream
//                .assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<XlogStreamBasicBean>() {
//                    @Override
//                    public long extractAscendingTimestamp(XlogStreamBasicBean 
element) {
//                        return element.getTimestamp() - 10000;
//                    }
//                }).uid("set_water_marks_uv" + ONE_HOUR)
//                .map((MapFunction<XlogStreamBasicBean, Tuple3<String, String, 
String>>) m -> new Tuple3<>(m.getPage(), m.getCookieId(), 
ONE_HOUR)).uid("uv_map_data" + ONE_HOUR)
//                .returns(Types.TUPLE(Types.STRING, Types.STRING, 
Types.STRING)).uid("uv-type-convert" + ONE_HOUR)
//                .keyBy(event -> event.f0)
//                .timeWindow(Time.days(1))
//                .process(getUvProcessWindowFunction(ONE_HOUR))
//                .addSink(getSinkFunction(parameterTool));

    }

    private static ProcessWindowFunction<ConvertBean, MetricResultBean, String, 
TimeWindow> getPvWindowProcessFunction() {
        return new ProcessWindowFunction<ConvertBean, MetricResultBean, String, 
TimeWindow>() {
            @Override
            public void process(String key, Context context, 
Iterable<ConvertBean> elements, Collector<MetricResultBean> out) throws 
Exception {
                long count = 0;
                for (ConvertBean element : elements) {
                    count++;
                }
                MetricResultBean metricResultBean = MetricResultBean.builder()
                        .page(key)
                        .count(count)
                        .dimension(ONE_MINUTE)
                        .name("pv")
                        .statisticsType(METRIC_TYPE_COUNT)
                        
.time(DateTimeUtil.formatDate(context.window().getStart())).build();
                generateId(metricResultBean);
                out.collect(metricResultBean);
            }
        };
    }

    /**
     * 计算UV的window function
     *
     * @return
     */
    private static ProcessWindowFunction<Tuple3<String, String, String>, 
MetricResultBean, String, TimeWindow> getUvProcessWindowFunction(String 
dimension) {
        return new ProcessWindowFunction<Tuple3<String, String, String>, 
MetricResultBean, String, TimeWindow>() {
            @Override
            public void process(String key, Context context, 
Iterable<Tuple3<String, String, String>> elements, Collector<MetricResultBean> 
out) throws Exception {
                // 从状态中恢复 userIdState
                Map<String, Boolean> userIdState = new HashMap<>();
                // 从状态中恢复 uvState
                AtomicLong uvState = new AtomicLong(0);

                String dimension = null;
                for (Tuple3<String, String, String> element : elements) {
                    if (dimension == null) {
                        dimension = element.f2;
                    }
                    if (!userIdState.containsKey(element.f1)) {
                        userIdState.put(element.f1, null);
                        uvState.getAndIncrement();
                    }
                }
                MetricResultBean metricResultBean = MetricResultBean.builder()
                        .page(key)
                        .count(uvState.get())
                        .dimension(dimension)
                        .name("uv")
                        .statisticsType(METRIC_TYPE_UNIQUE_COUNT)
                        
.time(DateTimeUtil.formatDate(context.window().getStart())).build();
                generateId(metricResultBean);
                out.collect(metricResultBean);
            }
        };
    }

    /**
     * 获取kafka的数据
     *
     * @param parameterTool
     * @return
     */
    private static FlinkKafkaProducer011<MetricResultBean> 
getSinkFunction(ParameterTool parameterTool) {
        return new FlinkKafkaProducer011<>(
                parameterTool.get("kafka.sink.brokers"),
                parameterTool.get("kafka.sink.topic"), new MetricResultSchema()
        );
    }

    /**
     * 生成唯一ID
     *
     * @param bean
     */
    private static void generateId(MetricResultBean bean) {
        String idStr = bean.getTime() + bean.getDimension() + bean.getName() + 
bean.getPage();
        long hashId = Hashing.murmur3_128(5).hashUnencodedChars(idStr).asLong();
        bean.setId(String.valueOf(hashId));
    }
}
public class KafkaTools {

    /**
     * 设置 kafka 配置
     *
     * @param parameterTool
     * @return
     */
    public static Properties buildKafkaProps(ParameterTool parameterTool) {
        Properties props = parameterTool.getProperties();
        props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, 
DEFAULT_KAFKA_BROKERS));
        props.put("zookeeper.connect", 
parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
        props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, 
DEFAULT_KAFKA_GROUP_ID));
        props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");
        return props;
    }

    public static DataStreamSource<XlogStreamBasicBean> 
buildSource(StreamExecutionEnvironment env) throws IllegalAccessException {
        ParameterTool parameter = (ParameterTool) 
env.getConfig().getGlobalJobParameters();
        String topic = parameter.getRequired(PropertiesConstants.METRICS_TOPIC);
        Long time = parameter.getLong(PropertiesConstants.CONSUMER_FROM_TIME, 
0L);
        return buildSource(env, topic, time);
    }


    /**
     * @param env
     * @param topic
     * @param time  订阅的时间
     * @return
     * @throws IllegalAccessException
     */
    public static DataStreamSource<XlogStreamBasicBean> 
buildSource(StreamExecutionEnvironment env, String topic, Long time) throws 
IllegalAccessException {
        ParameterTool parameterTool = (ParameterTool) 
env.getConfig().getGlobalJobParameters();
        Properties props = buildKafkaProps(parameterTool);
        FlinkKafkaConsumer011<XlogStreamBasicBean> consumer = new 
FlinkKafkaConsumer011<>(
                topic,
                new MetricSchema(),
                props);
        consumer.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<XlogStreamBasicBean>() {
            @Override
            public long extractAscendingTimestamp(XlogStreamBasicBean element) {
                if (element == null || element.getTimestamp() == null) {
                    return System.currentTimeMillis();
                }
                return element.getTimestamp() - 10000;
            }
        });
        return env.addSource(consumer);
    }

}

回复