??????????????????????kafka????????????????????????????????offset????????
------------------ ???????? ------------------ ??????: "????"<[email protected]>; ????????: 2020??4??7??(??????) ????11:27 ??????: "user-zh"<[email protected]>; ????: fink??????????????kafka???????????????????? Hello, ????????????????????flink??????????????????????????????????????????????????kafka???????????????????????????????????????????????????????????????????????????????????????????? ????wartermark????????kafka??consumer??????????????????????process???????????????????????? flink??????1.6.3 ?????????????? public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args); StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); DataStreamSource<XlogStreamBasicBean> data = KafkaTools.buildSource(env); // ????timing???? processTimingData(parameterTool, data); // ????front error???? processFrontErrorData(parameterTool, data); // ????img error???? processImgLoadErrorData(parameterTool, data); env.execute("xlog compute"); } kafka???????????????? public static Properties buildKafkaProps(ParameterTool parameterTool) { Properties props = parameterTool.getProperties(); props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS)); props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, DEFAULT_KAFKA_ZOOKEEPER_CONNECT)); props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID)); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); return props; } -- Best Wishes Galen.K
