??????????????????????kafka????????????????????????????????offset????????





------------------ ???????? ------------------
??????:&nbsp;"????"<[email protected]&gt;;
????????:&nbsp;2020??4??7??(??????) ????11:27
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;fink??????????????kafka????????????????????



Hello,
&nbsp;&nbsp;&nbsp; 
????????????????????flink??????????????????????????????????????????????????kafka????????????????????????????????????????????????????????????????????????????????????????????
&nbsp;&nbsp;&nbsp; 
????wartermark????????kafka??consumer??????????????????????process????????????????????????


flink??????1.6.3

??????????????

public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env = 
ExecutionEnvUtil.prepare(parameterTool);

&nbsp;&nbsp;&nbsp; DataStreamSource<XlogStreamBasicBean&gt; data = 
KafkaTools.buildSource(env);
// ????timing????
processTimingData(parameterTool, data);
// ????front error????
processFrontErrorData(parameterTool, data);
// ????img error????
processImgLoadErrorData(parameterTool, data);
&nbsp;&nbsp;&nbsp; env.execute("xlog compute");
}




kafka????????????????
public static Properties buildKafkaProps(ParameterTool parameterTool) {
&nbsp;&nbsp;&nbsp; Properties props = parameterTool.getProperties();
&nbsp;&nbsp;&nbsp; props.put("bootstrap.servers", 
parameterTool.get(KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));
&nbsp;&nbsp;&nbsp; props.put("zookeeper.connect", 
parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
&nbsp;&nbsp;&nbsp; props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, 
DEFAULT_KAFKA_GROUP_ID));
&nbsp;&nbsp;&nbsp; props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
&nbsp;&nbsp;&nbsp; props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
&nbsp;&nbsp;&nbsp; props.put("auto.offset.reset", "latest");
return props;
}







--

Best Wishes
&nbsp;&nbsp; Galen.K

回复