附件是代码
还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗
-- 代码是读出所有map状态的key。
在 2020-04-30 09:40:45,"shx" <[email protected]> 写道:
>能发一下写入状态的代码看一下吗,还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗,谢谢
>
>
>
>
>| |
>邵红晓
>|
>|
>邮箱:[email protected]
>|
>
>签名由 网易邮箱大师 定制
>
>在2020年04月30日 09:04,guanyq 写道:
>代码中没特别指定Serializer。都是默认的序列化。
>在 2020-04-29 18:20:22,"Congxian Qiu" <[email protected]> 写道:
>>Hi
>>从错误日志看,是 StateMigration 相关的问题。
>>你需要确认下,你的代码中的 Serializer 和 savepoint 中 state 相关的 serializer
>>是一样的或者是兼容的,你可以参考下这个文档[1]
>>
>>[1]
>>https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/schema_evolution.html
>>
>>Best,
>>Congxian
>>
>>
>>guanyq <[email protected]> 于2020年4月29日周三 下午6:09写道:
>>
>>>
>>> 附件是代码和错误日志。目前不知道如何调查。麻烦帮忙看下 谢谢。
package com.data.processing.entity;
/**
* æ°å¢
*
* @author guanyq
* @date 2020/4/5
*/
public class OnlineOrderUncompleted {
private String provinceCode;
private String inModeCode;
private String acceptType;
private String siteSelectionType;
private String resPrejudgeFlag;
private Long step;
private String orderId;
private String receiveDate;
public OnlineOrderUncompleted() {}
public OnlineOrderUncompleted(String provinceCode, String inModeCode,
String acceptType, String siteSelectionType, String resPrejudgeFlag, Long step,
String orderId, String receiveDate) {
this.provinceCode = provinceCode;
this.inModeCode = inModeCode;
this.acceptType = acceptType;
this.siteSelectionType = siteSelectionType;
this.resPrejudgeFlag = resPrejudgeFlag;
this.step = step;
this.orderId = orderId;
this.receiveDate = receiveDate;
}
@Override
public String toString() {
return "OnlineOrderUncompleted{" +
"provinceCode='" + provinceCode + '\'' +
", inModeCode='" + inModeCode + '\'' +
", acceptType='" + acceptType + '\'' +
", siteSelectionType='" + siteSelectionType + '\'' +
", resPrejudgeFlag='" + resPrejudgeFlag + '\'' +
", step='" + step + '\'' +
", orderId='" + orderId + '\'' +
", receiveDate='" + receiveDate + '\'' +
'}';
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getProvinceCode() {
return provinceCode;
}
public void setProvinceCode(String provinceCode) {
this.provinceCode = provinceCode;
}
public String getInModeCode() {
return inModeCode;
}
public void setInModeCode(String inModeCode) {
this.inModeCode = inModeCode;
}
public String getAcceptType() {
return acceptType;
}
public void setAcceptType(String acceptType) {
this.acceptType = acceptType;
}
public String getSiteSelectionType() {
return siteSelectionType;
}
public void setSiteSelectionType(String siteSelectionType) {
this.siteSelectionType = siteSelectionType;
}
public String getResPrejudgeFlag() {
return resPrejudgeFlag;
}
public void setResPrejudgeFlag(String resPrejudgeFlag) {
this.resPrejudgeFlag = resPrejudgeFlag;
}
public String getReceiveDate() {
return receiveDate;
}
public void setReceiveDate(String receiveDate) {
this.receiveDate = receiveDate;
}
public Long getStep() {
return step;
}
public void setStep(Long step) {
this.step = step;
}
}
package com.data.processing.unconditionalacceptance;
import com.alibaba.fastjson.JSONObject;
import com.data.processing.entity.OnlineOrderUncompleted;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* æ£å¼åå¨é
*
* @author guanyq
* @date 2020/04/04
*/
public class OnlineOrderUncompletedDataProcess {
public static final String BROADBAND_ORDER_ACCEPT =
"BROADBAND_ORDER_ACCEPT";
public static final String SIMPLE_ACCEPTANCE = "E";
public static final String SUBSCRIBE_STATE_9 = "9";
public static final String NEXT_DEAL_TAG_0 = "0";
public static final String SUBSCRIBE_STATE_0 = "0";
public static final String NEXT_DEAL_TAG_T = "T";
public static final String CONNECT_CHAR = "|";
public static final String CONNECT_CHAR_ID = "_";
public static final String ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS =
"OnlineOrderUncompletedDataProcess";
public static final String ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_1 =
"OnlineOrderUncompletedDataProcess_1";
public static final String ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_2 =
"OnlineOrderUncompletedDataProcess_2";
public static void main(String[] args) {
// log
Logger LOG =
LoggerFactory.getLogger(OnlineOrderUncompletedDataProcess.class);
try {
// parameter
final String handleDay;
final String bootstrapServers;
final String consumerTopic;
final String groupId;
final String sinkTopic;
final String windowTime;
final String checkpointDataUri;
final String checkpointInterval;
final ParameterTool params = ParameterTool.fromArgs(args);
handleDay = params.getRequired("handle.day");
bootstrapServers = params.getRequired("bootstrap.servers");
consumerTopic = params.getRequired("consumer.topic");
groupId = params.getRequired("group.id");
sinkTopic = params.getRequired("sink.topic");
windowTime = params.getRequired("window.time");
checkpointDataUri = params.getRequired("checkpoint.data.uri");
checkpointInterval = params.getRequired("checkpoint.interval");
// get env
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(Long.parseLong(checkpointInterval));
StateBackend backend = new FsStateBackend(checkpointDataUri);
env.setStateBackend(backend);
// set kafka topic
List<String> topicList = new ArrayList<>();
topicList.add(consumerTopic);
Properties props = getProperties(bootstrapServers, groupId);
// get kafka consumer
FlinkKafkaConsumer<String> consumer = new
FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props);
long handleDayEpochMilli = getHandleDayEpochMilli(handleDay);
consumer.setStartFromTimestamp(handleDayEpochMilli);
// kafka source
SingleOutputStreamOperator<String> ds =
env.addSource(consumer).name("source").uid("source");
// è®¡ç®æ£å¼åå¨ééï¼è¿æ»¤
SingleOutputStreamOperator<String> filter = ds.filter(new
PrimaryFilterFunction());
// format tuple
SingleOutputStreamOperator<Tuple2<String, String>> primaryMap =
filter.map(new PrimaryMapFunction());
// format map
SingleOutputStreamOperator<OnlineOrderUncompleted> map = primaryMap
.keyBy(0)
.map(new PrimaryRichMapFunction(handleDay))
.name(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_1)
.uid(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_1);
SingleOutputStreamOperator<String> finalStream = map
.keyBy("provinceCode", "inModeCode", "acceptType",
"siteSelectionType", "resPrejudgeFlag")
.timeWindow(Time.seconds(Long.parseLong(windowTime)))
.apply(new FinalRichFunction())
.name(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_2)
.uid(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_2);
// kafka properties
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
// kafka producer
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer(
sinkTopic,
(KafkaSerializationSchema) (o, aLong) -> {
String msg = String.valueOf(o);
JSONObject jsonMsg = JSONObject.parseObject(msg);
String tableName = jsonMsg.getString("TABLE_NAME");
String id = jsonMsg.getString("ID");
return new ProducerRecord(sinkTopic, (tableName +
id).getBytes(), msg.getBytes());
},
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
// sink to kafka
finalStream.addSink(kafkaProducer).name("ResultDataSink");
// execute
env.execute(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS);
} catch (Exception e) {
LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR");
LOG.error("ERROR_INFO:" + e.getMessage());
}
}
private static long getHandleDayEpochMilli(String handleDay) {
DateTimeFormatter ftf = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss");
LocalDateTime parse = LocalDateTime.parse(handleDay + " 00:00:00", ftf);
return
LocalDateTime.from(parse).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
}
private static Properties getProperties(String bootstrapServers, String
groupId) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
private static String getAcceptType(String resPrejudgeFlag) {
if (SIMPLE_ACCEPTANCE.equals(resPrejudgeFlag)) {
return "2";
} else {
return "1";
}
}
private static String getResPrejudgeFlag(String resPrejudgeFlag) {
return StringUtils.isEmpty(resPrejudgeFlag) ? "W" : resPrejudgeFlag;
}
private static class PrimaryMapFunction extends RichMapFunction<String,
Tuple2<String,String>> {
Logger LOG = LoggerFactory.getLogger(PrimaryMapFunction.class);
@Override
public Tuple2<String,String> map(String msg) {
try {
JSONObject jsonMsg = JSONObject.parseObject(msg);
String tableData = jsonMsg.getString("TABLE_DATA");
JSONObject data = JSONObject.parseObject(tableData);
String cbOrderId = data.getString("CB_ORDER_ID");
return new Tuple2<>(cbOrderId,msg);
} catch (Exception e) {
LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At
PrimaryMapFunction.map");
LOG.error("ERROR_INFO:" + e.getMessage());
LOG.error("ERROR_MSG:" + msg);
return null;
}
}
}
private static class PrimaryRichMapFunction extends
RichMapFunction<Tuple2<String, String>, OnlineOrderUncompleted> {
Logger LOG = LoggerFactory.getLogger(PrimaryRichMapFunction.class);
private String handleDay = "";
public PrimaryRichMapFunction(String handleDay) {
this.handleDay = handleDay;
}
private transient MapState<String, Long> dayUnComputeCnt;
@Override
public void open(Configuration parameters) {
try {
MapStateDescriptor<String, Long> descriptor = new
MapStateDescriptor(
"dayUnComputeCnt",
TypeInformation.of(String.class),
TypeInformation.of(Long.class)
);
dayUnComputeCnt = getRuntimeContext().getMapState(descriptor);
} catch (Exception e) {
LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At
PrimaryRichMapFunction.open");
LOG.error("ERROR_INFO:" + e.getMessage());
}
}
@Override
public OnlineOrderUncompleted map(Tuple2<String, String> t) {
try {
JSONObject jsonMsg = JSONObject.parseObject(t.f1);
JSONObject data =
JSONObject.parseObject(jsonMsg.getString("TABLE_DATA"));
String provinceCode = data.getString("PROVINCE_CODE");
String inModeCode = data.getString("IN_MODE_CODE");
String siteSelectionType =
data.getString("SITE_SELECTION_TYPE");
String resPrejudgeFlag =
getResPrejudgeFlag(data.getString("RES_PREJUDGE_FLAG"));
String acceptType = getAcceptType(resPrejudgeFlag);
String cbOrderId = data.getString("CB_ORDER_ID");
String receiveDate = data.getString("RECEIVE_DATE");
String subscribeState = data.getString("SUBSCRIBE_STATE");
String nextDealTag = data.getString("NEXT_DEAL_TAG");
String unCompleteKey = provinceCode + CONNECT_CHAR + cbOrderId
+ CONNECT_CHAR + inModeCode + CONNECT_CHAR + acceptType + CONNECT_CHAR +
siteSelectionType + CONNECT_CHAR + resPrejudgeFlag;
long step = 0L;
if
(StringUtils.substring(receiveDate,0,10).compareTo(this.handleDay) > 0) {
if (SUBSCRIBE_STATE_9.equals(subscribeState) &&
NEXT_DEAL_TAG_0.equals(nextDealTag)) {
if (dayUnComputeCnt.contains(unCompleteKey)) {
dayUnComputeCnt.remove(unCompleteKey);
step = step - 1L;
}
} else if (SUBSCRIBE_STATE_0.equals(subscribeState) &&
NEXT_DEAL_TAG_T.equals(nextDealTag)) {
if (dayUnComputeCnt.contains(unCompleteKey)) {
dayUnComputeCnt.remove(unCompleteKey);
step = step - 1L;
}
} else {
if (!dayUnComputeCnt.contains(unCompleteKey)) {
dayUnComputeCnt.put(unCompleteKey, 0L);
step = step + 1L;
}
}
}
return new OnlineOrderUncompleted(provinceCode, inModeCode,
acceptType, siteSelectionType, resPrejudgeFlag, step, cbOrderId, receiveDate);
} catch (Exception e) {
LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At
PrimaryRichMapFunction.map");
LOG.error("ERROR_INFO:" + e.getMessage());
LOG.error("ERROR_MSG:" + t.toString());
return null;
}
}
}
private static class PrimaryFilterFunction implements
FilterFunction<String> {
Logger LOG =
LoggerFactory.getLogger(OnlineOrderUncompletedDataProcess.class);
@Override
public boolean filter(String msg) {
try {
JSONObject jsonMsg = JSONObject.parseObject(msg);
String tableName = jsonMsg.getString("TABLE_NAME");
JSONObject data =
JSONObject.parseObject(jsonMsg.getString("TABLE_DATA"));
String provinceCode = data.getString("PROVINCE_CODE");
String inModeCode = data.getString("IN_MODE_CODE");
String siteSelectionType =
data.getString("SITE_SELECTION_TYPE");
String cbOrderId = data.getString("CB_ORDER_ID");
String receiveDate = data.getString("RECEIVE_DATE");
if (BROADBAND_ORDER_ACCEPT.equals(tableName)) {
if (StringUtils.isEmpty(provinceCode) ||
StringUtils.isEmpty(inModeCode) ||
StringUtils.isEmpty(siteSelectionType) ||
StringUtils.isEmpty(cbOrderId) ||
StringUtils.isEmpty(receiveDate)) {
return false;
}
return true;
}
return false;
} catch (Exception e) {
LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At
PrimaryFilterFunction.filter");
LOG.error("ERROR_INFO:" + e.getMessage());
LOG.error("ERROR_MSG:" + msg);
return false;
}
}
}
private static class FinalRichFunction extends
RichWindowFunction<OnlineOrderUncompleted, String, Tuple, TimeWindow> {
Logger LOG = LoggerFactory.getLogger(FinalRichFunction.class);
private transient ValueState<Long> dayComputeCnt;
@Override
public void open(Configuration config) {
try {
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("dayComputeCnt",
TypeInformation.of(new TypeHint<Long>() {
}));
dayComputeCnt = getRuntimeContext().getState(descriptor);
} catch (Exception e) {
LOG.error("ONLINE_ORDER_UNCOMPLETED_ERRORï¼At
FinalRichFunctionï¼");
LOG.error("ERROR_INFO:" + e.getMessage());
}
}
@Override
public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable<OnlineOrderUncompleted> iterable, Collector<String> collector) {
try {
JSONObject messageJson = new JSONObject();
String provinceCode = "";
String inModeCode = "";
String acceptType = "";
String siteSelectionType = "";
String resPrejudgeFlag = "";
long sum = 0L;
for (OnlineOrderUncompleted record : iterable) {
provinceCode = record.getProvinceCode();
inModeCode = record.getInModeCode();
acceptType = record.getAcceptType();
siteSelectionType = record.getSiteSelectionType();
resPrejudgeFlag = record.getResPrejudgeFlag();
sum = sum + record.getStep();
}
// ç份|æ¥å
¥æ¹å¼|åçæ¹å¼|éåæ¹å¼|è®¢åææ
String id = provinceCode + CONNECT_CHAR_ID + inModeCode +
CONNECT_CHAR_ID + acceptType + CONNECT_CHAR_ID + siteSelectionType +
CONNECT_CHAR_ID + resPrejudgeFlag;
messageJson.put("TABLE_NAME",
"BROADBAND_NO_REJECTION_UNCOMPLETED_AMOUNT");
messageJson.put("ID", id);
messageJson.put("PROVINCE_CODE", provinceCode);
messageJson.put("IN_MODE_CODE", inModeCode);
messageJson.put("ACCEPT_TYPE", acceptType);
messageJson.put("SITE_SELECTION_TYPE", siteSelectionType);
messageJson.put("RES_PREJUDGE_FLAG", resPrejudgeFlag);
if (dayComputeCnt.value() == null) {
dayComputeCnt.update(sum);
} else {
dayComputeCnt.update(dayComputeCnt.value() + sum);
}
// æ£å¼åå¨éåé
messageJson.put("ONLINE_ORDER_UNCOMPLETED_AMOUNT",
dayComputeCnt.value());
collector.collect(messageJson.toJSONString());
} catch (Exception e) {
LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At
FinalRichFunction.apply");
LOG.error("ERROR_INFO:" + e.getMessage());
LOG.error("ERROR_MSG:" + iterable.toString());
}
}
}
}