??????????????????????????????????????????????????????????
1????????????????????????????????
2????????????????????????????????
??????????????????????????????????????????????????debug??????????????????????????????????????????????????????????????;
??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????15956076613
|
//??????????????????????????????????????????
OutputTag<BaseInfo2> requestStream = new OutputTag<BaseInfo2>("requestStream") {
};
OutputTag<BaseInfo2> answerStream = new OutputTag<BaseInfo2>("answerStream") {
};
SingleOutputStreamOperator<BaseInfo2> tagStream =
completeInfoStream.process(new MyProcessFunction2(requestStream, answerStream));
DataStream<BaseInfo2> requestDataStream =
tagStream.getSideOutput(requestStream).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
.withTimestampAssigner((element, recordTimestamp) ->
element.getEvenTime()));
DataStream<BaseInfo2> answerDataStream =
tagStream.getSideOutput(answerStream).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
.withTimestampAssigner((element, recordTimestamp) ->
element.getEvenTime()));
|
| |
????????
|
|
ccc0606fight...@163.com
|
---- ???????????? ----
| ?????? | L Y<531599...@qq.com.INVALID> |
| ???????? | 2023??5??23?? 01:25 |
| ?????? | user-zh<user-zh@flink.apache.org> |
| ???? | ??????table api????rowtime?????? |
HI??????????????????????????????????????????????????????????????????????????????????
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result,
MidInfo.class);
????????????DataStream??????????????????????????????????????midStream??????????result????????????MidInfo.class????????????????????????????????????????????????????DataStream??????????Table??????????????DataStream????????????????????????????
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result,
MidInfo.class).assignTimestampsAndWatermarks(WatermarkStrategy.<MidInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)->
element.getEventTime()));
????????????????????????????????????????MapFunction??????????????????????????????????????????????????????????????????????????????????????????
L Y
531599...@qq.com
L Y
531599...@qq.com
------------------ ???????? ------------------
??????:
"user-zh"
<ccc0606fight...@163.com>;
????????: 2023??5??22??(??????) ????9:26
??????: "user-zh"<user-zh@flink.apache.org>;
????: "user-zh"<user-zh@flink.apache.org>;
????: ??????table api????rowtime??????
flink????????????1.14
| |
????????
|
|
ccc0606fight...@163.com
|
---- ???????????? ----
| ?????? | L Y<531599...@qq.com.INVALID> |
| ???????? | 2023??5??20?? 01:10 |
| ?????? | user-zh<user-zh@flink.apache.org> |
| ???? | ??????table api????rowtime?????? |
HI??????????
??????????????????midStream??????????????????????????????????????????????midStream????????????????????????????????????????????????????????????????????????
??????
SingleOutputStreamOperator<Event&gt; eventStream = env
.fromElements(
..............
).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event&gt;forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event&gt;() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);
??????????????????????midStream????????????????flink??????
LY
531599...@qq.com
L&nbsp;Y
531599...@qq.com
&nbsp;
------------------&nbsp;????????&nbsp;------------------
??????:
"user-zh"
<ccc0606fight...@163.com&gt;;
????????:&nbsp;2023??5??17??(??????) ????9:28
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
????:&nbsp;table api????rowtime??????
????????????????????????????
| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"),
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"),
$("eventTime").rowtime());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
tableEnv.createTemporaryView("midTable1",midTable); Table resulTable =
tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as
minTime\n"
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
"FROM TABLE(CUMULATE(\n"
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
" TABLE
midTable1"+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
//" TABLE "+ midTable
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
" , DESCRIPTOR(eventTime)\n"
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
" , INTERVAL '60' SECOND\n"
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
" , INTERVAL '1' DAY))\n"
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
??????????????????????????????????????eventTime??rowtime,????????????????sqlQuery??????????????????????Rowtime
timestamp is not defined. Please make sure that a proper TimestampAssigner is
defined and the stream environment uses the EventTime time characteristic
??????????????????????????
| |
????????
|
|
ccc0606fight...@163.com
|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import bean2.BaseInfo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import function.MyProcessFunction2;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Author æ
* @Time 2023/5/10 8:32
* ä½¿ç¨ flink processæ¹æ³å¯¹ä¸æ¡æµè¿è¡æå
èèå°ç¶æè®¾ç½®çé®é¢ï¼éç¨æ¸è¿å¼ç´¯è®¡çªå£è¿è¡è®¡ç®
* å
å°ä¸¤æ¡æµè½¬æ¢åç表å
ç®åçå
³èå¨ä¸èµ·ï¼åå
³èçæ¶åå ä¸ä¸ä¸ªäºä»¶æ¶é´ï¼row_time as cast(CURRENT_TIMESTAMP AS
timestamp(3) )ï¼
* ä¹åå¨ä½¿ç¨æ¸è¿å¼çªå£è¿è¡è®¡ç®
*/
public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//1ãè¿æ¥æµè¯ç¯å¢kafkaçæ°æ®
String servers =
FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource<String> sourceStream = env.addSource(new
FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
sourceStream.print("sourceStream");
//{"ip":"10.125.8.141","data":"åºç: -- 14:28:05.111 --
<44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=f7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
//2ãå¯¹æºæ°æ®è¿è¡å¤çï¼çæbaseInfoåºç±»çæ°æ®
SingleOutputStreamOperator<BaseInfo> baseInfoStream =
sourceStream.map(new MapFunction<String, BaseInfo>() {
@Override
public BaseInfo map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//è·åå°ä¸åçæå¡å¨IP
String serverIp = jsonObject.getString("ip");
//è·åå°ä¸åçdataçæ°æ®
String datas = jsonObject.getString("data");
String[] splits = datas.split("\n");
HashMap<String, String> dataMap = new HashMap<>();
//å°timeå¡«å
å°èªå®ä¹ç±»åä¸ï¼ç¨æ¥å¤æåä¸ä¸ªnumç请æ±ä»¥ååºçæ¶é´
String time = splits[0].substring(7, 19);
//å°subDataå¡«å
å°èªå®ä¹ç±»åä¸ï¼ç¨æ¥å¤ææ¶è¯·æ±è¿æ¯åºç
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo(dataMap.get("action"), serverIp,
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
}
});
baseInfoStream.print("baseInfoStream");
//3ãä¸è¿°çæ°æ®æµä¸çactionä»
ä»
æ¯æ°åï¼éè¦å
³èä¸ä¸MySQL廿¿å°å¯¹åºçåè½ä¸ææè¿°
SingleOutputStreamOperator<BaseInfo2> completeInfoStream =
baseInfoStream.map(new MapFunction<BaseInfo, BaseInfo2>() {
@Override
public BaseInfo2 map(BaseInfo value) throws Exception {
//æ¿å°æ°æ®ä¸æºå¸¦çæ°åçaction
String actionId = value.getFuncId();
System.out.println("æ°æ®ä¸çactionç¼ç æ¯ï¼ " +
actionId);
String actionName = null;
Connection connection = null;
PreparedStatement ps = null;
//æ ¹æ®æ°æ®çactionå»MySQL䏿¥æ¾å°å¯¹åºçä¸å注é
try {
String sql = "select action_name from ActionType where
action = ?";
connection = JdbcUtil.getConnection();
ps = connection.prepareStatement(sql);
ps.setString(1, actionId);
ResultSet resultSet = ps.executeQuery();
System.out.println("resultSetæ¯" + resultSet);
if (resultSet.next()) {
actionName = resultSet.getString("action_name");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
JdbcUtil.closeResource(connection, ps);
}
return new BaseInfo2(value.getFuncId(), actionName,
value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(),
value.getInfo(), System.currentTimeMillis());
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element,
recordTimestamp) -> element.getEvenTime()));
completeInfoStream.print("å ä¸ä¸ææè¿°ç completeInfoStream");
//å°å®æ´ä¿¡æ¯çæµæ°æ®ä¾æ®è¯·æ±è¿æ¯åºçè¿è¡æè§£
OutputTag<BaseInfo2> requestStream = new
OutputTag<BaseInfo2>("requestStream") {
};
OutputTag<BaseInfo2> answerStream = new
OutputTag<BaseInfo2>("answerStream") {
};
SingleOutputStreamOperator<BaseInfo2> tagStream =
completeInfoStream.process(new MyProcessFunction2(requestStream, answerStream));
DataStream<BaseInfo2> requestDataStream =
tagStream.getSideOutput(requestStream).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
.withTimestampAssigner((element, recordTimestamp) ->
element.getEvenTime()));
DataStream<BaseInfo2> answerDataStream =
tagStream.getSideOutput(answerStream).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
.withTimestampAssigner((element, recordTimestamp) ->
element.getEvenTime()));
requestDataStream.print("è¯·æ±æµæ¯ requestDataStream");
answerDataStream.print("åºçæµæ¯ answerDataStream");
//6ãè®²ä¸¤æ¡æµè½¬æ¢ä¸ºå¯¹åºç表 è¿è¡å
³èåæå°å¼
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
System.out.println("è¿å
¥æµå¼ç¯å¢å¯");
Table tableRequest =tableEnv.fromDataStream(requestDataStream,
$("funcId"), $("funcIdDesc"), $("serverIp"), $("baseTime"),
$("handleSerialNo"), $("info"), $("eventTime").rowtime());
Table tableAnswer =tableEnv.fromDataStream(answerDataStream,
$("funcId"), $("funcIdDesc"), $("serverIp"), $("baseTime"),
$("handleSerialNo"), $("info"), $("eventTime").rowtime());
Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.baseTime as maxTime,\n" +
"\ta.baseTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.eventTime as et\n" +
" from " + tableRequest + " a\n " +
" inner join " + tableAnswer + " b" +
" on a.handleSerialNo=b.handleSerialNo ");
System.out.println("è¿ä¸ªæ¯resultTable" + result);
result.printSchema();
//tableEnv.createTemporaryView("resultTable", result);
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result,
MidInfo.class)
.assignTimestampsAndWatermarks(WatermarkStrategy.<MidInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)->
element.getEt().getTime()));
Table midTable = tableEnv.fromDataStream(midStream, $("funcId"),
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"),
$("et").rowtime())
.select($("funcId"), $("funcIdDesc"), $("serverIp"),
$("maxTime"), $("minTime"), $("pk"), $("et"));
midTable.printSchema();
tableEnv.createTemporaryView("midTable1", midTable);
//使ç¨TVFçéç¨æ¸è¿å¼ç´¯è®¡çªå£è¿è¡è®¡ç®
Table resulTable = tableEnv.sqlQuery("SELECT
funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable1" +
//" TABLE "+ midTable +
" , DESCRIPTOR(et)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY
window_start,window_end,funcId,funcIdDesc,serverIp,pk");
resulTable.printSchema();
tableEnv.executeSql("select * from "+resulTable).print();
// DataStream<Tuple2<Boolean, ResultInfo>> resultStream =
tableEnv.toRetractStream(resulTable, ResultInfo.class);
// resultStream.print("resultStream");
// SingleOutputStreamOperator<ResultInfo> resultInfoStream =
resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>() {
// @Override
// public ResultInfo map(Tuple2<Boolean, ResultInfo> value) throws
Exception {
// return value.f1;
// }
// });
// resultInfoStream.print("resultInfoStream");
// resultInfoStream.addSink(new Sink2Mysql());
env.execute();
}
}
package bean;
/**
* @Author æ
* @Time 2023/5/9 18:35
* è¿ä¸ªç¨æ¥å®ä¹åºç¡ä¿¡æ¯
* {"ip":"10.125.8.141","data":"请æ±: -- 14:28:05.111 --
<44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=f7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
*/
public class BaseInfo {
private String funcId;//åè½å·
private String serverIp;//æå¡å¨IP
private Long
baseTime;//ä¿¡æ¯æºå¸¦çæ¶é´ï¼éè¦å°â08:35:25.268âè¿ç§ç±»åçæ¶é´è½¬æ¢ä¸ºlongç±»åï¼
private String handleSerialNo;//客æ·ç¼ç ï¼ç¨æ¥è¯·æ±ä¸åºçè¿è¡å
³èï¼
private String info;//è®°å½è¿ä¸ªæ¥å¿æ¯è¯·æ±è¿æ¯åºç
public BaseInfo() {
}
public BaseInfo(String funcId, String serverIp, Long baseTime, String
handleSerialNo, String info) {
this.funcId = funcId;
this.serverIp = serverIp;
this.baseTime = baseTime;
this.handleSerialNo = handleSerialNo;
this.info = info;
}
public String getFuncId() {
return funcId;
}
public void setFuncId(String funcId) {
this.funcId = funcId;
}
public String getServerIp() {
return serverIp;
}
public void setServerIp(String serverIp) {
this.serverIp = serverIp;
}
public Long getBaseTime() {
return baseTime;
}
public void setBaseTime(Long baseTime) {
this.baseTime = baseTime;
}
public String getHandleSerialNo() {
return handleSerialNo;
}
public void setHandleSerialNo(String handleSerialNo) {
this.handleSerialNo = handleSerialNo;
}
public String getInfo() {
return info;
}
public void setInfo(String info) {
this.info = info;
}
@Override
public String toString() {
return "BaseInfo{" +
"funcId='" + funcId + '\'' +
", serverIp='" + serverIp + '\'' +
", baseTime=" + baseTime +
", handleSerialNo='" + handleSerialNo + '\'' +
", info='" + info + '\'' +
'}';
}
}
package bean2;
/**
* @Author æ
* @Time 2023/5/19 10:36
* ç¨æ¥æåæºæ°æ®ä¸çå 个å¿
è¦å段ï¼å¢å ä¸ä¸ªäºä»¶æ¶é´æ³
*/
public class BaseInfo2 {
private String funcId;//åè½å·
private String funcIdDesc;// è®°å½å
·ä½åè½å·ç䏿æè¿°
private String serverIp;//æå¡å¨IP
private Long
baseTime;//ä¿¡æ¯æºå¸¦çæ¶é´ï¼éè¦å°â08:35:25.268âè¿ç§ç±»åçæ¶é´è½¬æ¢ä¸ºlongç±»åï¼
private String handleSerialNo;//客æ·ç¼ç ï¼ç¨æ¥è¯·æ±ä¸åºçè¿è¡å
³èï¼
private String info;//è®°å½è¿ä¸ªæ¥å¿æ¯è¯·æ±è¿æ¯åºç
private Long evenTime;//以å½åæ¶é´ä½ä¸ºäºä»¶æ¶é´
public BaseInfo2() {
}
public BaseInfo2(String funcId, String funcIdDesc, String serverIp, Long
baseTime, String handleSerialNo, String info, Long evenTime) {
this.funcId = funcId;
this.funcIdDesc = funcIdDesc;
this.serverIp = serverIp;
this.baseTime = baseTime;
this.handleSerialNo = handleSerialNo;
this.info = info;
this.evenTime = evenTime;
}
public String getFuncId() {
return funcId;
}
public void setFuncId(String funcId) {
this.funcId = funcId;
}
public String getFuncIdDesc() {
return funcIdDesc;
}
public void setFuncIdDesc(String funcIdDesc) {
this.funcIdDesc = funcIdDesc;
}
public String getServerIp() {
return serverIp;
}
public void setServerIp(String serverIp) {
this.serverIp = serverIp;
}
public Long getBaseTime() {
return baseTime;
}
public void setBaseTime(Long baseTime) {
this.baseTime = baseTime;
}
public String getHandleSerialNo() {
return handleSerialNo;
}
public void setHandleSerialNo(String handleSerialNo) {
this.handleSerialNo = handleSerialNo;
}
public String getInfo() {
return info;
}
public void setInfo(String info) {
this.info = info;
}
public Long getEvenTime() {
return evenTime;
}
public void setEvenTime(Long evenTime) {
this.evenTime = evenTime;
}
@Override
public String toString() {
return "BaseInfo2{" +
"funcId='" + funcId + '\'' +
", funcIdDesc='" + funcIdDesc + '\'' +
", serverIp='" + serverIp + '\'' +
", baseTime=" + baseTime +
", handleSerialNo='" + handleSerialNo + '\'' +
", info='" + info + '\'' +
", evenTime=" + evenTime +
'}';
}
}
package function;
import bean.BaseInfo;
import bean2.BaseInfo2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* @Author æ
* @Time 2023/5/10 9:24
* å
·ä½çåæµé»è¾å®ç°
*/
public class MyProcessFunction2 extends ProcessFunction<BaseInfo2, BaseInfo2> {
private OutputTag<BaseInfo2> requestStream;
private OutputTag<BaseInfo2> answerStream;
public MyProcessFunction2(OutputTag<BaseInfo2> requestStream,
OutputTag<BaseInfo2> answerStream) {
this.requestStream = requestStream;
this.answerStream = answerStream;
}
@Override
public void processElement(BaseInfo2 value, ProcessFunction<BaseInfo2,
BaseInfo2>.Context ctx, Collector<BaseInfo2> out) throws Exception {
//å°å
å«è¯·æ±çæ°æ®è¾åºå°tagä¸
if (value.getInfo().contains("请æ±")){
ctx.output(requestStream,value);
}
//å°å
å«åºççæ°æ®è¾åºå°tagä¸
if (value.getInfo().contains("åºç")){
ctx.output(answerStream,value);
}
}
}
package bean;
import java.sql.Timestamp;
/**
* @Author æ
* @Time 2023/5/15 14:52
* è¿ä¸ªbeanç±»ç¨æ¥å½ä½ä¸¤å¼ 表å
³èä¹åçç»æbean
*/
public class MidInfo {
private String funcId;
private String funcIdDesc;
private String serverIp;
private Long maxTime;
private Long minTime;
private String pk;
private Timestamp et;
public MidInfo() {
}
public MidInfo(String funcId, String funcIdDesc, String serverIp, Long
maxTime, Long minTime, String pk, Timestamp et) {
this.funcId = funcId;
this.funcIdDesc = funcIdDesc;
this.serverIp = serverIp;
this.maxTime = maxTime;
this.minTime = minTime;
this.pk = pk;
this.et = et;
}
public String getFuncId() {
return funcId;
}
public void setFuncId(String funcId) {
this.funcId = funcId;
}
public String getFuncIdDesc() {
return funcIdDesc;
}
public void setFuncIdDesc(String funcIdDesc) {
this.funcIdDesc = funcIdDesc;
}
public String getServerIp() {
return serverIp;
}
public void setServerIp(String serverIp) {
this.serverIp = serverIp;
}
public Long getMaxTime() {
return maxTime;
}
public void setMaxTime(Long maxTime) {
this.maxTime = maxTime;
}
public Long getMinTime() {
return minTime;
}
public void setMinTime(Long minTime) {
this.minTime = minTime;
}
public String getPk() {
return pk;
}
public void setPk(String pk) {
this.pk = pk;
}
public Timestamp getEt() {
return et;
}
public void setEt(Timestamp et) {
this.et = et;
}
@Override
public String toString() {
return "MidInfo{" +
"funcId='" + funcId + '\'' +
", funcIdDesc='" + funcIdDesc + '\'' +
", serverIp='" + serverIp + '\'' +
", maxTime=" + maxTime +
", minTime=" + minTime +
", pk='" + pk + '\'' +
", et=" + et +
'}';
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ryi_migrate</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
<!--è¿æ¥Kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>