附件是两份主要代码
--
Best Wishes
Galen.K
在 2020-04-07 12:11:07,"酷酷的浑蛋" <[email protected]> 写道:
>是不是代码中设置了从头消费,还有可能提交offset到kafka的代码中设置了false?因为你的代码应该不是全的,所以没法具体看
>
>
>| |
>apache22
>|
>|
>[email protected]
>|
>签名由网易邮箱大师定制
>在2020年4月7日 12:03,苟刚<[email protected]> 写道:
>
>
>
>latest 不是最后消费的位置吗?
>另外我一直不明白的是,如果我不新增新的算子,从savepoint启动是没有问题的。不会从头开始消费,之后新增算子后才会出现这个情况。
>
>
>
>
>
>
>
>
>
>
>--
>
>Best Wishes
>Galen.K
>
>
>
>
>
>在 2020-04-07 11:39:03,"sunfulin" <[email protected]> 写道:
>
>
>
>Hi,
>props.put("auto.offset.reset", "latest");
>是加了这个设置导致的吧
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-04-07 11:27:53,"苟刚" <[email protected]> 写道:
>Hello,
>我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
>我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
>
>
>flink版本:1.6.3
>
>部分代码如下:
>
>public static void main(String[] args) throws Exception {
>final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
>StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
>
>DataStreamSource<XlogStreamBasicBean> data = KafkaTools.buildSource(env);
>// 处理timing数据
>processTimingData(parameterTool, data);
>// 处理front error数据
>processFrontErrorData(parameterTool, data);
>// 处理img error数据
>processImgLoadErrorData(parameterTool, data);
>env.execute("xlog compute");
>}
>
>
>
>
>kafka的连接参数配置:
>public static Properties buildKafkaProps(ParameterTool parameterTool) {
>Properties props = parameterTool.getProperties();
>props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS,
>DEFAULT_KAFKA_BROKERS));
>props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT,
>DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
>props.put("group.id", parameterTool.get(KAFKA_GROUP_ID,
>DEFAULT_KAFKA_GROUP_ID));
>props.put("key.deserializer",
>"org.apache.kafka.common.serialization.StringDeserializer");
>props.put("value.deserializer",
>"org.apache.kafka.common.serialization.StringDeserializer");
>props.put("auto.offset.reset", "latest");
>return props;
>}
>
>
>
>
>
>
>
>--
>
>Best Wishes
>Galen.K
>
public class StreamFromKafkaV2 {
static Logger logger = LoggerFactory.getLogger(StreamFromKafkaV2.class);
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool =
ExecutionEnvUtil.createParameterTool(args);
StreamExecutionEnvironment env =
ExecutionEnvUtil.prepare(parameterTool);
DataStreamSource<XlogStreamBasicBean> data =
KafkaTools.buildSource(env);
// å¤çtimingæ°æ®
processTimingData(parameterTool, data);
// å¤çfront erroræ°æ®
processFrontErrorData(parameterTool, data);
// å¤çimg erroræ°æ®
processImgLoadErrorData(parameterTool, data);
env.execute("xlog compute");
}
/**
* å¤çå¾çå 载失败çä¿¡æ¯
*
* @param parameterTool
* @param data
* @throws Exception
*/
private static void processImgLoadErrorData(ParameterTool parameterTool,
DataStreamSource<XlogStreamBasicBean> data) throws Exception {
// è·åè¿æ»¤åçIMG_LOAD_ERRORæ°æ®
SingleOutputStreamOperator<XlogStreamBasicBean> frontErrorStream = data
.filter(filter -> DIY.equalsIgnoreCase(filter.getSubType()))
.name("è¿æ»¤IMG_LOAD_ERRORæ°æ®").uid("filter_img_laod_error_data");
// 计ç®å个页é¢çIMGé误æ°é
frontErrorStream
.map(new MapFunction<XlogStreamBasicBean, ConvertBean>() {
@Override
public ConvertBean map(XlogStreamBasicBean
xlogStreamBasicBean) throws Exception {
return new ConvertBean(xlogStreamBasicBean.getPage() +
"|" + xlogStreamBasicBean.getKey(), 1L);
}
}).name("IMG-ERRORæ°æ®ç±»å转æ¢").uid("map_img_load_error_data")
.keyBy(event -> event.page)
.timeWindow(Time.minutes(1))
.reduce((ReduceFunction<ConvertBean>) (value1, value2) ->
value1.add(value2.count), new WindowFunction<ConvertBean, MetricResultBean,
String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window,
Iterable<ConvertBean> elements, Collector<MetricResultBean> out) throws
Exception {
long count = 0;
for (ConvertBean element : elements) {
count += element.count;
}
String[] pageKey = key.split("|");
MetricResultBean metricResultBean =
MetricResultBean.builder()
.page(pageKey[0])
.count(count)
.dimension(ONE_MINUTE)
.name(pageKey[1])
.statisticsType(METRIC_TYPE_COUNT)
.time(DateTimeUtil.formatDate(window.getStart())).build();
generateId(metricResultBean);
out.collect(metricResultBean);
}
}).name("IMG-REDUCEè¿ç¨").uid("reduce_img_load_error_count")
.addSink(getSinkFunction(parameterTool)).name("è¾åºIMG-LOAD-ERRORå¼å°kafka").uid("sink_img_load_error_to_kafka");
}
/**
* å¤çerrorçä¿¡æ¯
*
* @param parameterTool
* @param data
* @throws Exception
*/
private static void processFrontErrorData(ParameterTool parameterTool,
DataStreamSource<XlogStreamBasicBean> data) throws Exception {
// è·åè¿æ»¤åçFRONT_ERRORæ°æ®
SingleOutputStreamOperator<XlogStreamBasicBean> frontErrorStream = data
.filter(filter ->
FRONT_ERROR.equalsIgnoreCase(filter.getSubType()) &&
!NULL.equalsIgnoreCase(filter.getSubType()))
.name("è¿æ»¤FRONT_ERRORæ°æ®").uid("filter_front_error_data");
// 计ç®å个页é¢çJSé误æ°é
frontErrorStream
.map(new MapFunction<XlogStreamBasicBean, ConvertBean>() {
@Override
public ConvertBean map(XlogStreamBasicBean
xlogStreamBasicBean) throws Exception {
return new ConvertBean(xlogStreamBasicBean.getPage(),
1L);
}
}).name("JSæ°æ®ç±»å转æ¢").uid("map_js_error_data")
.keyBy(event -> event.page)
.timeWindow(Time.minutes(1))
.reduce((ReduceFunction<ConvertBean>) (value1, value2) ->
value1.add(value2.count), new WindowFunction<ConvertBean, MetricResultBean,
String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window,
Iterable<ConvertBean> elements, Collector<MetricResultBean> out) throws
Exception {
long count = 0;
for (ConvertBean element : elements) {
count += element.count;
}
MetricResultBean metricResultBean =
MetricResultBean.builder()
.page(key)
.count(count)
.dimension(ONE_MINUTE)
.name("jsErrorCount")
.statisticsType(METRIC_TYPE_COUNT)
.time(DateTimeUtil.formatDate(window.getStart())).build();
generateId(metricResultBean);
out.collect(metricResultBean);
}
}).name("REDUCEè¿ç¨").uid("reduce_js_error_count")
.addSink(getSinkFunction(parameterTool)).name("è¾åºJS-ERRORå¼å°kafka").uid("sink_js_error_to_kafka");
}
/**
* å¤çtimingæ°æ®æº
*
* @param parameterTool
* @param data
* @throws Exception
*/
private static void processTimingData(ParameterTool parameterTool,
DataStreamSource<XlogStreamBasicBean> data) throws Exception {
OutputTag<ConvertBean> pvOutputTag = new
OutputTag<ConvertBean>("pv-output-tag") {
};
// è·åè¿æ»¤åçTimingæ°æ®
SingleOutputStreamOperator<XlogStreamBasicBean> timingStream = data
.filter(filter -> TIMING.equalsIgnoreCase(filter.getType()) &&
!NULL.equalsIgnoreCase(filter.getType())).name("è¿æ»¤TIMINGæ°æ®").uid("filter_timing_data");
// 计ç®å个页é¢çPV
timingStream
.map(new MapFunction<XlogStreamBasicBean, ConvertBean>() {
@Override
public ConvertBean map(XlogStreamBasicBean
xlogStreamBasicBean) throws Exception {
return new ConvertBean(xlogStreamBasicBean.getPage(),
1L);
}
}).name("PVæ°æ®ç±»å转æ¢").uid("map_data_new")
.keyBy(event -> event.page)
.timeWindow(Time.minutes(1))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(pvOutputTag)
.process(getPvWindowProcessFunction()).name("ç»è®¡PVå¼").uid("process_result_bean_new")
.addSink(getSinkFunction(parameterTool)).name("è¾åºPVå°kafka").uid("sink_pv_to_kafka");
DataStream<ConvertBean> sideOutput =
timingStream.getSideOutput(pvOutputTag);
sideOutput.process(new ProcessFunction<ConvertBean, MetricResultBean>()
{
@Override
public void processElement(ConvertBean value, Context ctx,
Collector<MetricResultBean> out) throws Exception {
ctx.timerService().currentWatermark();
MetricResultBean metricResultBean = MetricResultBean.builder()
.page(value.page)
.count(value.count)
.dimension(ONE_MINUTE)
.name("pv")
.statisticsType(METRIC_TYPE_COUNT)
.lateData(true)
.time(DateTimeUtil.formatDate(ctx.timerService().currentWatermark())).build();
generateId(metricResultBean);
out.collect(metricResultBean);
}
}).name("å¤çPVå»¶è¿æ°æ®").uid("process_pv_late_data")
.addSink(getSinkFunction(parameterTool)).name("åéPVå»¶è¿æ°æ®å°kafka").uid("send_pv_late_data_to_kafka");
// 计ç®å个页é¢çUV 1åé维度
timingStream
.map((MapFunction<XlogStreamBasicBean, Tuple3<String, String,
String>>) m -> new Tuple3<>(m.getPage(), m.getCookieId(),
ONE_MINUTE)).uid("uv_map_data" + ONE_MINUTE)
.returns(Types.TUPLE(Types.STRING, Types.STRING,
Types.STRING)).uid("uv-type-convert" + ONE_MINUTE)
.keyBy(event -> event.f0)
.timeWindow(Time.minutes(1))
.process(getUvProcessWindowFunction(ONE_MINUTE))
.addSink(getSinkFunction(parameterTool));
// 计ç®å个页é¢çUV 5åé维度
timingStream
.map((MapFunction<XlogStreamBasicBean, Tuple3<String, String,
String>>) m -> new Tuple3<>(m.getPage(), m.getCookieId(),
FIVE_MINUTE)).uid("uv_map_data" + FIVE_MINUTE)
.returns(Types.TUPLE(Types.STRING, Types.STRING,
Types.STRING)).uid("uv-type-convert" + FIVE_MINUTE)
.keyBy(event -> event.f0)
.timeWindow(Time.minutes(5))
.process(getUvProcessWindowFunction(FIVE_MINUTE))
.addSink(getSinkFunction(parameterTool));
// 计ç®å个页é¢çUV 1å°æ¶ç»´åº¦
timingStream
.map((MapFunction<XlogStreamBasicBean, Tuple3<String, String,
String>>) m -> new Tuple3<>(m.getPage(), m.getCookieId(),
ONE_HOUR)).uid("uv_map_data" + ONE_HOUR)
.returns(Types.TUPLE(Types.STRING, Types.STRING,
Types.STRING)).uid("uv-type-convert" + ONE_HOUR)
.keyBy(event -> event.f0)
.timeWindow(Time.hours(1))
.process(getUvProcessWindowFunction(ONE_HOUR))
.addSink(getSinkFunction(parameterTool));
//TODO 计ç®å个页é¢çUV 1天维度
// timingStream
// .assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<XlogStreamBasicBean>() {
// @Override
// public long extractAscendingTimestamp(XlogStreamBasicBean
element) {
// return element.getTimestamp() - 10000;
// }
// }).uid("set_water_marks_uv" + ONE_HOUR)
// .map((MapFunction<XlogStreamBasicBean, Tuple3<String, String,
String>>) m -> new Tuple3<>(m.getPage(), m.getCookieId(),
ONE_HOUR)).uid("uv_map_data" + ONE_HOUR)
// .returns(Types.TUPLE(Types.STRING, Types.STRING,
Types.STRING)).uid("uv-type-convert" + ONE_HOUR)
// .keyBy(event -> event.f0)
// .timeWindow(Time.days(1))
// .process(getUvProcessWindowFunction(ONE_HOUR))
// .addSink(getSinkFunction(parameterTool));
}
private static ProcessWindowFunction<ConvertBean, MetricResultBean, String,
TimeWindow> getPvWindowProcessFunction() {
return new ProcessWindowFunction<ConvertBean, MetricResultBean, String,
TimeWindow>() {
@Override
public void process(String key, Context context,
Iterable<ConvertBean> elements, Collector<MetricResultBean> out) throws
Exception {
long count = 0;
for (ConvertBean element : elements) {
count++;
}
MetricResultBean metricResultBean = MetricResultBean.builder()
.page(key)
.count(count)
.dimension(ONE_MINUTE)
.name("pv")
.statisticsType(METRIC_TYPE_COUNT)
.time(DateTimeUtil.formatDate(context.window().getStart())).build();
generateId(metricResultBean);
out.collect(metricResultBean);
}
};
}
/**
* 计ç®UVçwindow function
*
* @return
*/
private static ProcessWindowFunction<Tuple3<String, String, String>,
MetricResultBean, String, TimeWindow> getUvProcessWindowFunction(String
dimension) {
return new ProcessWindowFunction<Tuple3<String, String, String>,
MetricResultBean, String, TimeWindow>() {
@Override
public void process(String key, Context context,
Iterable<Tuple3<String, String, String>> elements, Collector<MetricResultBean>
out) throws Exception {
// ä»ç¶æä¸æ¢å¤ userIdState
Map<String, Boolean> userIdState = new HashMap<>();
// ä»ç¶æä¸æ¢å¤ uvState
AtomicLong uvState = new AtomicLong(0);
String dimension = null;
for (Tuple3<String, String, String> element : elements) {
if (dimension == null) {
dimension = element.f2;
}
if (!userIdState.containsKey(element.f1)) {
userIdState.put(element.f1, null);
uvState.getAndIncrement();
}
}
MetricResultBean metricResultBean = MetricResultBean.builder()
.page(key)
.count(uvState.get())
.dimension(dimension)
.name("uv")
.statisticsType(METRIC_TYPE_UNIQUE_COUNT)
.time(DateTimeUtil.formatDate(context.window().getStart())).build();
generateId(metricResultBean);
out.collect(metricResultBean);
}
};
}
/**
* è·åkafkaçæ°æ®
*
* @param parameterTool
* @return
*/
private static FlinkKafkaProducer011<MetricResultBean>
getSinkFunction(ParameterTool parameterTool) {
return new FlinkKafkaProducer011<>(
parameterTool.get("kafka.sink.brokers"),
parameterTool.get("kafka.sink.topic"), new MetricResultSchema()
);
}
/**
* çæå¯ä¸ID
*
* @param bean
*/
private static void generateId(MetricResultBean bean) {
String idStr = bean.getTime() + bean.getDimension() + bean.getName() +
bean.getPage();
long hashId = Hashing.murmur3_128(5).hashUnencodedChars(idStr).asLong();
bean.setId(String.valueOf(hashId));
}
}
public class KafkaTools {
/**
* 设置 kafka é
ç½®
*
* @param parameterTool
* @return
*/
public static Properties buildKafkaProps(ParameterTool parameterTool) {
Properties props = parameterTool.getProperties();
props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS,
DEFAULT_KAFKA_BROKERS));
props.put("zookeeper.connect",
parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
props.put("group.id", parameterTool.get(KAFKA_GROUP_ID,
DEFAULT_KAFKA_GROUP_ID));
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
return props;
}
public static DataStreamSource<XlogStreamBasicBean>
buildSource(StreamExecutionEnvironment env) throws IllegalAccessException {
ParameterTool parameter = (ParameterTool)
env.getConfig().getGlobalJobParameters();
String topic = parameter.getRequired(PropertiesConstants.METRICS_TOPIC);
Long time = parameter.getLong(PropertiesConstants.CONSUMER_FROM_TIME,
0L);
return buildSource(env, topic, time);
}
/**
* @param env
* @param topic
* @param time 订é
çæ¶é´
* @return
* @throws IllegalAccessException
*/
public static DataStreamSource<XlogStreamBasicBean>
buildSource(StreamExecutionEnvironment env, String topic, Long time) throws
IllegalAccessException {
ParameterTool parameterTool = (ParameterTool)
env.getConfig().getGlobalJobParameters();
Properties props = buildKafkaProps(parameterTool);
FlinkKafkaConsumer011<XlogStreamBasicBean> consumer = new
FlinkKafkaConsumer011<>(
topic,
new MetricSchema(),
props);
consumer.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<XlogStreamBasicBean>() {
@Override
public long extractAscendingTimestamp(XlogStreamBasicBean element) {
if (element == null || element.getTimestamp() == null) {
return System.currentTimeMillis();
}
return element.getTimestamp() - 10000;
}
});
return env.addSource(consumer);
}
}