附件是两份主要代码
-- Best Wishes Galen.K 在 2020-04-07 12:11:07,"酷酷的浑蛋" <apach...@163.com> 写道: >是不是代码中设置了从头消费,还有可能提交offset到kafka的代码中设置了false?因为你的代码应该不是全的,所以没法具体看 > > >| | >apache22 >| >| >apach...@163.com >| >签名由网易邮箱大师定制 >在2020年4月7日 12:03,苟刚<gougang_1...@163.com> 写道: > > > >latest 不是最后消费的位置吗? >另外我一直不明白的是,如果我不新增新的算子,从savepoint启动是没有问题的。不会从头开始消费,之后新增算子后才会出现这个情况。 > > > > > > > > > > >-- > >Best Wishes >Galen.K > > > > > >在 2020-04-07 11:39:03,"sunfulin" <sunfulin0...@163.com> 写道: > > > >Hi, >props.put("auto.offset.reset", "latest"); >是加了这个设置导致的吧 > > > > > > > > > > > > > > >在 2020-04-07 11:27:53,"苟刚" <gougang_1...@163.com> 写道: >Hello, >我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗? >我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。 > > >flink版本:1.6.3 > >部分代码如下: > >public static void main(String[] args) throws Exception { >final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args); >StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); > >DataStreamSource<XlogStreamBasicBean> data = KafkaTools.buildSource(env); >// 处理timing数据 >processTimingData(parameterTool, data); >// 处理front error数据 >processFrontErrorData(parameterTool, data); >// 处理img error数据 >processImgLoadErrorData(parameterTool, data); >env.execute("xlog compute"); >} > > > > >kafka的连接参数配置: >public static Properties buildKafkaProps(ParameterTool parameterTool) { >Properties props = parameterTool.getProperties(); >props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, >DEFAULT_KAFKA_BROKERS)); >props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, >DEFAULT_KAFKA_ZOOKEEPER_CONNECT)); >props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, >DEFAULT_KAFKA_GROUP_ID)); >props.put("key.deserializer", >"org.apache.kafka.common.serialization.StringDeserializer"); >props.put("value.deserializer", >"org.apache.kafka.common.serialization.StringDeserializer"); >props.put("auto.offset.reset", "latest"); >return props; >} > > > > > > > >-- > >Best Wishes >Galen.K >
public class StreamFromKafkaV2 { static Logger logger = LoggerFactory.getLogger(StreamFromKafkaV2.class); public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args); StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); DataStreamSource<XlogStreamBasicBean> data = KafkaTools.buildSource(env); // å¤çtimingæ°æ® processTimingData(parameterTool, data); // å¤çfront erroræ°æ® processFrontErrorData(parameterTool, data); // å¤çimg erroræ°æ® processImgLoadErrorData(parameterTool, data); env.execute("xlog compute"); } /** * å¤çå¾çå 载失败çä¿¡æ¯ * * @param parameterTool * @param data * @throws Exception */ private static void processImgLoadErrorData(ParameterTool parameterTool, DataStreamSource<XlogStreamBasicBean> data) throws Exception { // è·åè¿æ»¤åçIMG_LOAD_ERRORæ°æ® SingleOutputStreamOperator<XlogStreamBasicBean> frontErrorStream = data .filter(filter -> DIY.equalsIgnoreCase(filter.getSubType())) .name("è¿æ»¤IMG_LOAD_ERRORæ°æ®").uid("filter_img_laod_error_data"); // 计ç®å个页é¢çIMGé误æ°é frontErrorStream .map(new MapFunction<XlogStreamBasicBean, ConvertBean>() { @Override public ConvertBean map(XlogStreamBasicBean xlogStreamBasicBean) throws Exception { return new ConvertBean(xlogStreamBasicBean.getPage() + "|" + xlogStreamBasicBean.getKey(), 1L); } }).name("IMG-ERRORæ°æ®ç±»å转æ¢").uid("map_img_load_error_data") .keyBy(event -> event.page) .timeWindow(Time.minutes(1)) .reduce((ReduceFunction<ConvertBean>) (value1, value2) -> value1.add(value2.count), new WindowFunction<ConvertBean, MetricResultBean, String, TimeWindow>() { @Override public void apply(String key, TimeWindow window, Iterable<ConvertBean> elements, Collector<MetricResultBean> out) throws Exception { long count = 0; for (ConvertBean element : elements) { count += element.count; } String[] pageKey = key.split("|"); MetricResultBean metricResultBean = MetricResultBean.builder() .page(pageKey[0]) .count(count) .dimension(ONE_MINUTE) .name(pageKey[1]) .statisticsType(METRIC_TYPE_COUNT) .time(DateTimeUtil.formatDate(window.getStart())).build(); generateId(metricResultBean); out.collect(metricResultBean); } }).name("IMG-REDUCEè¿ç¨").uid("reduce_img_load_error_count") .addSink(getSinkFunction(parameterTool)).name("è¾åºIMG-LOAD-ERRORå¼å°kafka").uid("sink_img_load_error_to_kafka"); } /** * å¤çerrorçä¿¡æ¯ * * @param parameterTool * @param data * @throws Exception */ private static void processFrontErrorData(ParameterTool parameterTool, DataStreamSource<XlogStreamBasicBean> data) throws Exception { // è·åè¿æ»¤åçFRONT_ERRORæ°æ® SingleOutputStreamOperator<XlogStreamBasicBean> frontErrorStream = data .filter(filter -> FRONT_ERROR.equalsIgnoreCase(filter.getSubType()) && !NULL.equalsIgnoreCase(filter.getSubType())) .name("è¿æ»¤FRONT_ERRORæ°æ®").uid("filter_front_error_data"); // 计ç®å个页é¢çJSé误æ°é frontErrorStream .map(new MapFunction<XlogStreamBasicBean, ConvertBean>() { @Override public ConvertBean map(XlogStreamBasicBean xlogStreamBasicBean) throws Exception { return new ConvertBean(xlogStreamBasicBean.getPage(), 1L); } }).name("JSæ°æ®ç±»å转æ¢").uid("map_js_error_data") .keyBy(event -> event.page) .timeWindow(Time.minutes(1)) .reduce((ReduceFunction<ConvertBean>) (value1, value2) -> value1.add(value2.count), new WindowFunction<ConvertBean, MetricResultBean, String, TimeWindow>() { @Override public void apply(String key, TimeWindow window, Iterable<ConvertBean> elements, Collector<MetricResultBean> out) throws Exception { long count = 0; for (ConvertBean element : elements) { count += element.count; } MetricResultBean metricResultBean = MetricResultBean.builder() .page(key) .count(count) .dimension(ONE_MINUTE) .name("jsErrorCount") .statisticsType(METRIC_TYPE_COUNT) .time(DateTimeUtil.formatDate(window.getStart())).build(); generateId(metricResultBean); out.collect(metricResultBean); } }).name("REDUCEè¿ç¨").uid("reduce_js_error_count") .addSink(getSinkFunction(parameterTool)).name("è¾åºJS-ERRORå¼å°kafka").uid("sink_js_error_to_kafka"); } /** * å¤çtimingæ°æ®æº * * @param parameterTool * @param data * @throws Exception */ private static void processTimingData(ParameterTool parameterTool, DataStreamSource<XlogStreamBasicBean> data) throws Exception { OutputTag<ConvertBean> pvOutputTag = new OutputTag<ConvertBean>("pv-output-tag") { }; // è·åè¿æ»¤åçTimingæ°æ® SingleOutputStreamOperator<XlogStreamBasicBean> timingStream = data .filter(filter -> TIMING.equalsIgnoreCase(filter.getType()) && !NULL.equalsIgnoreCase(filter.getType())).name("è¿æ»¤TIMINGæ°æ®").uid("filter_timing_data"); // 计ç®å个页é¢çPV timingStream .map(new MapFunction<XlogStreamBasicBean, ConvertBean>() { @Override public ConvertBean map(XlogStreamBasicBean xlogStreamBasicBean) throws Exception { return new ConvertBean(xlogStreamBasicBean.getPage(), 1L); } }).name("PVæ°æ®ç±»å转æ¢").uid("map_data_new") .keyBy(event -> event.page) .timeWindow(Time.minutes(1)) .allowedLateness(Time.minutes(1)) .sideOutputLateData(pvOutputTag) .process(getPvWindowProcessFunction()).name("ç»è®¡PVå¼").uid("process_result_bean_new") .addSink(getSinkFunction(parameterTool)).name("è¾åºPVå°kafka").uid("sink_pv_to_kafka"); DataStream<ConvertBean> sideOutput = timingStream.getSideOutput(pvOutputTag); sideOutput.process(new ProcessFunction<ConvertBean, MetricResultBean>() { @Override public void processElement(ConvertBean value, Context ctx, Collector<MetricResultBean> out) throws Exception { ctx.timerService().currentWatermark(); MetricResultBean metricResultBean = MetricResultBean.builder() .page(value.page) .count(value.count) .dimension(ONE_MINUTE) .name("pv") .statisticsType(METRIC_TYPE_COUNT) .lateData(true) .time(DateTimeUtil.formatDate(ctx.timerService().currentWatermark())).build(); generateId(metricResultBean); out.collect(metricResultBean); } }).name("å¤çPV延è¿æ°æ®").uid("process_pv_late_data") .addSink(getSinkFunction(parameterTool)).name("åéPV延è¿æ°æ®å°kafka").uid("send_pv_late_data_to_kafka"); // 计ç®å个页é¢çUV 1åé维度 timingStream .map((MapFunction<XlogStreamBasicBean, Tuple3<String, String, String>>) m -> new Tuple3<>(m.getPage(), m.getCookieId(), ONE_MINUTE)).uid("uv_map_data" + ONE_MINUTE) .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING)).uid("uv-type-convert" + ONE_MINUTE) .keyBy(event -> event.f0) .timeWindow(Time.minutes(1)) .process(getUvProcessWindowFunction(ONE_MINUTE)) .addSink(getSinkFunction(parameterTool)); // 计ç®å个页é¢çUV 5åé维度 timingStream .map((MapFunction<XlogStreamBasicBean, Tuple3<String, String, String>>) m -> new Tuple3<>(m.getPage(), m.getCookieId(), FIVE_MINUTE)).uid("uv_map_data" + FIVE_MINUTE) .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING)).uid("uv-type-convert" + FIVE_MINUTE) .keyBy(event -> event.f0) .timeWindow(Time.minutes(5)) .process(getUvProcessWindowFunction(FIVE_MINUTE)) .addSink(getSinkFunction(parameterTool)); // 计ç®å个页é¢çUV 1å°æ¶ç»´åº¦ timingStream .map((MapFunction<XlogStreamBasicBean, Tuple3<String, String, String>>) m -> new Tuple3<>(m.getPage(), m.getCookieId(), ONE_HOUR)).uid("uv_map_data" + ONE_HOUR) .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING)).uid("uv-type-convert" + ONE_HOUR) .keyBy(event -> event.f0) .timeWindow(Time.hours(1)) .process(getUvProcessWindowFunction(ONE_HOUR)) .addSink(getSinkFunction(parameterTool)); //TODO 计ç®å个页é¢çUV 1天维度 // timingStream // .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<XlogStreamBasicBean>() { // @Override // public long extractAscendingTimestamp(XlogStreamBasicBean element) { // return element.getTimestamp() - 10000; // } // }).uid("set_water_marks_uv" + ONE_HOUR) // .map((MapFunction<XlogStreamBasicBean, Tuple3<String, String, String>>) m -> new Tuple3<>(m.getPage(), m.getCookieId(), ONE_HOUR)).uid("uv_map_data" + ONE_HOUR) // .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING)).uid("uv-type-convert" + ONE_HOUR) // .keyBy(event -> event.f0) // .timeWindow(Time.days(1)) // .process(getUvProcessWindowFunction(ONE_HOUR)) // .addSink(getSinkFunction(parameterTool)); } private static ProcessWindowFunction<ConvertBean, MetricResultBean, String, TimeWindow> getPvWindowProcessFunction() { return new ProcessWindowFunction<ConvertBean, MetricResultBean, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<ConvertBean> elements, Collector<MetricResultBean> out) throws Exception { long count = 0; for (ConvertBean element : elements) { count++; } MetricResultBean metricResultBean = MetricResultBean.builder() .page(key) .count(count) .dimension(ONE_MINUTE) .name("pv") .statisticsType(METRIC_TYPE_COUNT) .time(DateTimeUtil.formatDate(context.window().getStart())).build(); generateId(metricResultBean); out.collect(metricResultBean); } }; } /** * 计ç®UVçwindow function * * @return */ private static ProcessWindowFunction<Tuple3<String, String, String>, MetricResultBean, String, TimeWindow> getUvProcessWindowFunction(String dimension) { return new ProcessWindowFunction<Tuple3<String, String, String>, MetricResultBean, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<Tuple3<String, String, String>> elements, Collector<MetricResultBean> out) throws Exception { // ä»ç¶æä¸æ¢å¤ userIdState Map<String, Boolean> userIdState = new HashMap<>(); // ä»ç¶æä¸æ¢å¤ uvState AtomicLong uvState = new AtomicLong(0); String dimension = null; for (Tuple3<String, String, String> element : elements) { if (dimension == null) { dimension = element.f2; } if (!userIdState.containsKey(element.f1)) { userIdState.put(element.f1, null); uvState.getAndIncrement(); } } MetricResultBean metricResultBean = MetricResultBean.builder() .page(key) .count(uvState.get()) .dimension(dimension) .name("uv") .statisticsType(METRIC_TYPE_UNIQUE_COUNT) .time(DateTimeUtil.formatDate(context.window().getStart())).build(); generateId(metricResultBean); out.collect(metricResultBean); } }; } /** * è·åkafkaçæ°æ® * * @param parameterTool * @return */ private static FlinkKafkaProducer011<MetricResultBean> getSinkFunction(ParameterTool parameterTool) { return new FlinkKafkaProducer011<>( parameterTool.get("kafka.sink.brokers"), parameterTool.get("kafka.sink.topic"), new MetricResultSchema() ); } /** * çæå¯ä¸ID * * @param bean */ private static void generateId(MetricResultBean bean) { String idStr = bean.getTime() + bean.getDimension() + bean.getName() + bean.getPage(); long hashId = Hashing.murmur3_128(5).hashUnencodedChars(idStr).asLong(); bean.setId(String.valueOf(hashId)); } }
public class KafkaTools { /** * 设置 kafka é ç½® * * @param parameterTool * @return */ public static Properties buildKafkaProps(ParameterTool parameterTool) { Properties props = parameterTool.getProperties(); props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS)); props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, DEFAULT_KAFKA_ZOOKEEPER_CONNECT)); props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID)); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); return props; } public static DataStreamSource<XlogStreamBasicBean> buildSource(StreamExecutionEnvironment env) throws IllegalAccessException { ParameterTool parameter = (ParameterTool) env.getConfig().getGlobalJobParameters(); String topic = parameter.getRequired(PropertiesConstants.METRICS_TOPIC); Long time = parameter.getLong(PropertiesConstants.CONSUMER_FROM_TIME, 0L); return buildSource(env, topic, time); } /** * @param env * @param topic * @param time 订é çæ¶é´ * @return * @throws IllegalAccessException */ public static DataStreamSource<XlogStreamBasicBean> buildSource(StreamExecutionEnvironment env, String topic, Long time) throws IllegalAccessException { ParameterTool parameterTool = (ParameterTool) env.getConfig().getGlobalJobParameters(); Properties props = buildKafkaProps(parameterTool); FlinkKafkaConsumer011<XlogStreamBasicBean> consumer = new FlinkKafkaConsumer011<>( topic, new MetricSchema(), props); consumer.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<XlogStreamBasicBean>() { @Override public long extractAscendingTimestamp(XlogStreamBasicBean element) { if (element == null || element.getTimestamp() == null) { return System.currentTimeMillis(); } return element.getTimestamp() - 10000; } }); return env.addSource(consumer); } }