??????????????????????????????????????????????????????????
1????????????????????????????????
2????????????????????????????????
??????????????????????????????????????????????????debug??????????????????????????????????????????????????????????????;
 
??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????15956076613
|
//??????????????????????????????????????????
OutputTag<BaseInfo2> requestStream = new OutputTag<BaseInfo2>("requestStream") {
};
OutputTag<BaseInfo2> answerStream = new OutputTag<BaseInfo2>("answerStream") {
};

SingleOutputStreamOperator<BaseInfo2> tagStream = 
completeInfoStream.process(new MyProcessFunction2(requestStream, answerStream));
DataStream<BaseInfo2> requestDataStream = 
tagStream.getSideOutput(requestStream).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
        .withTimestampAssigner((element, recordTimestamp) -> 
element.getEvenTime()));
DataStream<BaseInfo2> answerDataStream = 
tagStream.getSideOutput(answerStream).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
        .withTimestampAssigner((element, recordTimestamp) -> 
element.getEvenTime()));
|


| |
????????
|
|
ccc0606fight...@163.com
|
---- ???????????? ----
| ?????? | L Y<531599...@qq.com.INVALID> |
| ???????? | 2023??5??23?? 01:25 |
| ?????? | user-zh<user-zh@flink.apache.org> |
| ???? | ??????table api????rowtime?????? |
HI??????????????????????????????????????????????????????????????????????????????????


DataStream<MidInfo&gt; midStream = tableEnv.toAppendStream(result, 
MidInfo.class);


????????????DataStream??????????????????????????????????????midStream??????????result????????????MidInfo.class????????????????????????????????????????????????????DataStream??????????Table??????????????DataStream????????????????????????????


DataStream<MidInfo&gt; midStream = tableEnv.toAppendStream(result, 
MidInfo.class).assignTimestampsAndWatermarks(WatermarkStrategy.<MidInfo&gt;forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-&gt;
 element.getEventTime()));


????????????????????????????????????????MapFunction??????????????????????????????????????????????????????????????????????????????????????????


L Y


531599...@qq.com





L&nbsp;Y
531599...@qq.com



&nbsp;




------------------&nbsp;????????&nbsp;------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<ccc0606fight...@163.com&gt;;
????????:&nbsp;2023??5??22??(??????) ????9:26
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
????:&nbsp;??????table api????rowtime??????



flink????????????1.14


| |
????????
|
|
ccc0606fight...@163.com
|
---- ???????????? ----
| ?????? | L Y<531599...@qq.com.INVALID&gt; |
| ???????? | 2023??5??20?? 01:10 |
| ?????? | user-zh<user-zh@flink.apache.org&gt; |
| ???? | ??????table api????rowtime?????? |
HI??????????
??????????????????midStream??????????????????????????????????????????????midStream????????????????????????????????????????????????????????????????????????
??????


SingleOutputStreamOperator<Event&amp;gt; eventStream = env
.fromElements(
..............&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event&amp;gt;forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event&amp;gt;() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);





??????????????????????midStream????????????????flink??????


LY
531599...@qq.com




L&amp;nbsp;Y
531599...@qq.com



&amp;nbsp;




------------------&amp;nbsp;????????&amp;nbsp;------------------
??????:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 
"user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 <ccc0606fight...@163.com&amp;gt;;
????????:&amp;nbsp;2023??5??17??(??????) ????9:28
??????:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;

????:&amp;nbsp;table api????rowtime??????



????????????????????????????

| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), 
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), 
$("eventTime").rowtime());&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = 
tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as 
minTime\n" 
+&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 "FROM TABLE(CUMULATE(\n" 
+&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 " TABLE 
midTable1"+&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 //" TABLE "+ midTable 
+&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 " , DESCRIPTOR(eventTime)\n" 
+&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 " , INTERVAL '60' SECOND\n" 
+&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 " , INTERVAL '1' DAY))\n" 
+&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
??????????????????????????????????????eventTime??rowtime,????????????????sqlQuery??????????????????????Rowtime
 timestamp is not defined. Please make sure that a proper TimestampAssigner is 
defined and the stream environment uses the EventTime time characteristic
??????????????????????????
| |
????????
|
|
ccc0606fight...@163.com
|
package job;

import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import bean2.BaseInfo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import function.MyProcessFunction2;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author 昌
 * @Time 2023/5/10 8:32
 * 使用 flink process方法对一条流进行拆分 
考虑到状态设置的问题,采用渐进式累计窗口进行计算
 * 先将两条流转换后的表先简单的关联在一起,再å…
³è”的时候加上一个事件时间(row_time as cast(CURRENT_TIMESTAMP  AS 
timestamp(3) ))
 * 之后在使用渐进式窗口进行计算
 */
public class RytLogAnly4 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //1、连接测试环境kafka的数据
        String servers = 
FlinkConfig.config.getProperty("dev_bootstrap.servers");
        String topicName = FlinkConfig.config.getProperty("dev_topicName");
        String groupId = FlinkConfig.config.getProperty("dev_groupId");
        String devMode = FlinkConfig.config.getProperty("dev_mode");
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", servers);
        prop.setProperty("group.id", groupId);
        prop.setProperty("auto.offset.reset", devMode);
        DataStreamSource<String> sourceStream = env.addSource(new 
FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
        sourceStream.print("sourceStream");
//{"ip":"10.125.8.141","data":"应答: -- 14:28:05.111 -- 
<44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=f7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
        //2、对源数据进行处理,生成baseInfo基类的数据
        SingleOutputStreamOperator<BaseInfo> baseInfoStream = 
sourceStream.map(new MapFunction<String, BaseInfo>() {
            @Override
            public BaseInfo map(String value) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                //获取到不同的服务器IP
                String serverIp = jsonObject.getString("ip");
                //获取到不同的data的数据
                String datas = jsonObject.getString("data");

                String[] splits = datas.split("\n");
                HashMap<String, String> dataMap = new HashMap<>();
                //将time填充
到自定义类型中,用来判断同一个num的请求以及应答时间
                String time = splits[0].substring(7, 19);
                //将subData填充
到自定义类型中,用来判断时请求还是应答
                String subData = datas.substring(0, 10);
                for (int i = 0; i < splits.length; i++) {
                    if (splits[i].contains("=")) {
                        splits[i] = splits[i].replaceFirst("=", "&");
                        String[] temp = splits[i].split("&");
                        if (temp.length > 1) {
                            dataMap.put(temp[0].toLowerCase(), temp[1]);
                        }
                    }
                }
                return new BaseInfo(dataMap.get("action"), serverIp, 
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
            }
        });
        baseInfoStream.print("baseInfoStream");
        //3、上述的数据流中的action仅仅是数字,需要å…
³è”一下MySQL去拿到对应的功能中文描述
        SingleOutputStreamOperator<BaseInfo2> completeInfoStream = 
baseInfoStream.map(new MapFunction<BaseInfo, BaseInfo2>() {
            @Override
            public BaseInfo2 map(BaseInfo value) throws Exception {
                //拿到数据中携带的数字的action
                String actionId = value.getFuncId();
                System.out.println("数据中的action编码是: " + 
actionId);
                String actionName = null;
                Connection connection = null;
                PreparedStatement ps = null;

                //根据数据的action去MySQL中查找到对应的中午注释
                try {
                    String sql = "select action_name from ActionType where 
action = ?";
                    connection = JdbcUtil.getConnection();
                    ps = connection.prepareStatement(sql);
                    ps.setString(1, actionId);
                    ResultSet resultSet = ps.executeQuery();
                    System.out.println("resultSet是" + resultSet);
                    if (resultSet.next()) {
                        actionName = resultSet.getString("action_name");
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    JdbcUtil.closeResource(connection, ps);
                }
                return new BaseInfo2(value.getFuncId(), actionName, 
value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), 
value.getInfo(), System.currentTimeMillis());
            }
        
}).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element,
 recordTimestamp) -> element.getEvenTime()));
        completeInfoStream.print("加上中文描述的 completeInfoStream");

        //将完整信息的流数据依据请求还是应答进行拆解
        OutputTag<BaseInfo2> requestStream = new 
OutputTag<BaseInfo2>("requestStream") {
        };
        OutputTag<BaseInfo2> answerStream = new 
OutputTag<BaseInfo2>("answerStream") {
        };

        SingleOutputStreamOperator<BaseInfo2> tagStream = 
completeInfoStream.process(new MyProcessFunction2(requestStream, answerStream));
        DataStream<BaseInfo2> requestDataStream = 
tagStream.getSideOutput(requestStream).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
                .withTimestampAssigner((element, recordTimestamp) -> 
element.getEvenTime()));
        DataStream<BaseInfo2> answerDataStream = 
tagStream.getSideOutput(answerStream).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
                .withTimestampAssigner((element, recordTimestamp) -> 
element.getEvenTime()));

        requestDataStream.print("请求流是 requestDataStream");
        answerDataStream.print("应答流是 answerDataStream");



        //6、讲两条流转换为对应的表 进行关联取最小值
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        System.out.println("进入流式环境咯");

        Table tableRequest =tableEnv.fromDataStream(requestDataStream, 
$("funcId"), $("funcIdDesc"), $("serverIp"), $("baseTime"), 
$("handleSerialNo"), $("info"), $("eventTime").rowtime());

        Table tableAnswer =tableEnv.fromDataStream(answerDataStream, 
$("funcId"), $("funcIdDesc"), $("serverIp"), $("baseTime"), 
$("handleSerialNo"), $("info"), $("eventTime").rowtime());

        Table result = tableEnv.sqlQuery("select \n" +
                "\ta.funcId as funcId ,\n" +
                "\ta.funcIdDesc as funcIdDesc,\n" +
                "\ta.serverIp as serverIp,\n" +
                "\tb.baseTime as maxTime,\n" +
                "\ta.baseTime as minTime,\t\n" +
                "\tconcat(a.funcId,a.serverIp) as pk ,\n" +
                " a.eventTime  as et\n" +
                " from " + tableRequest + " a\n " +
                " inner join " + tableAnswer + " b" +
                " on a.handleSerialNo=b.handleSerialNo ");
        System.out.println("这个是resultTable" + result);
        result.printSchema();
        //tableEnv.createTemporaryView("resultTable", result);

        DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, 
MidInfo.class)
                    
.assignTimestampsAndWatermarks(WatermarkStrategy.<MidInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)->
 element.getEt().getTime()));
        Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), 
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), 
$("et").rowtime())
                .select($("funcId"), $("funcIdDesc"), $("serverIp"), 
$("maxTime"), $("minTime"), $("pk"), $("et"));
        midTable.printSchema();
        tableEnv.createTemporaryView("midTable1", midTable);

        //使用TVF的采用渐进式累计窗口进行计算
        Table resulTable = tableEnv.sqlQuery("SELECT 
funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
                "FROM TABLE(CUMULATE(\n" +
                " TABLE midTable1" +
                //" TABLE "+ midTable +
                " , DESCRIPTOR(et)\n" +
                " , INTERVAL '60' SECOND\n" +
                " , INTERVAL '1' DAY))\n" +
                " GROUP BY 
window_start,window_end,funcId,funcIdDesc,serverIp,pk");
        resulTable.printSchema();
        tableEnv.executeSql("select * from "+resulTable).print();

//        DataStream<Tuple2<Boolean, ResultInfo>> resultStream = 
tableEnv.toRetractStream(resulTable, ResultInfo.class);
//        resultStream.print("resultStream");
//        SingleOutputStreamOperator<ResultInfo> resultInfoStream = 
resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>() {
//            @Override
//            public ResultInfo map(Tuple2<Boolean, ResultInfo> value) throws 
Exception {
//                return value.f1;
//            }
//        });
//        resultInfoStream.print("resultInfoStream");
//        resultInfoStream.addSink(new Sink2Mysql());


        env.execute();


    }
}

package bean;

/**
 * @Author 昌
 * @Time 2023/5/9 18:35
 * 这个用来定义基础信息
 * {"ip":"10.125.8.141","data":"请求: -- 14:28:05.111 -- 
<44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=f7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
 */
public class BaseInfo {
    private String funcId;//功能号
    private String serverIp;//服务器IP
    private Long 
baseTime;//信息携带的时间(需要将‘08:35:25.268’这种类型的时间转换为long类型)

    private String handleSerialNo;//客户编码(用来请求与应答进行å…
³è”)

    private String info;//记录这个日志是请求还是应答

    public BaseInfo() {
    }

    public BaseInfo(String funcId, String serverIp, Long baseTime, String 
handleSerialNo, String info) {
        this.funcId = funcId;
        this.serverIp = serverIp;
        this.baseTime = baseTime;
        this.handleSerialNo = handleSerialNo;
        this.info = info;
    }

    public String getFuncId() {
        return funcId;
    }

    public void setFuncId(String funcId) {
        this.funcId = funcId;
    }

    public String getServerIp() {
        return serverIp;
    }

    public void setServerIp(String serverIp) {
        this.serverIp = serverIp;
    }

    public Long getBaseTime() {
        return baseTime;
    }

    public void setBaseTime(Long baseTime) {
        this.baseTime = baseTime;
    }

    public String getHandleSerialNo() {
        return handleSerialNo;
    }

    public void setHandleSerialNo(String handleSerialNo) {
        this.handleSerialNo = handleSerialNo;
    }

    public String getInfo() {
        return info;
    }

    public void setInfo(String info) {
        this.info = info;
    }

    @Override
    public String toString() {
        return "BaseInfo{" +
                "funcId='" + funcId + '\'' +
                ", serverIp='" + serverIp + '\'' +
                ", baseTime=" + baseTime +
                ", handleSerialNo='" + handleSerialNo + '\'' +
                ", info='" + info + '\'' +
                '}';
    }
}
package bean2;

/**
 * @Author 昌
 * @Time 2023/5/19 10:36
 * 用来提取源数据中的几个必要字段,增加一个事件时间戳
 */
public class BaseInfo2 {

    private String funcId;//功能号
    private String funcIdDesc;// 记录具体功能号的中文描述
    private String serverIp;//服务器IP
    private Long 
baseTime;//信息携带的时间(需要将‘08:35:25.268’这种类型的时间转换为long类型)

    private String handleSerialNo;//客户编码(用来请求与应答进行å…
³è”)

    private String info;//记录这个日志是请求还是应答
    private Long evenTime;//以当前时间作为事件时间

    public BaseInfo2() {
    }

    public BaseInfo2(String funcId, String funcIdDesc, String serverIp, Long 
baseTime, String handleSerialNo, String info, Long evenTime) {
        this.funcId = funcId;
        this.funcIdDesc = funcIdDesc;
        this.serverIp = serverIp;
        this.baseTime = baseTime;
        this.handleSerialNo = handleSerialNo;
        this.info = info;
        this.evenTime = evenTime;
    }

    public String getFuncId() {
        return funcId;
    }

    public void setFuncId(String funcId) {
        this.funcId = funcId;
    }

    public String getFuncIdDesc() {
        return funcIdDesc;
    }

    public void setFuncIdDesc(String funcIdDesc) {
        this.funcIdDesc = funcIdDesc;
    }

    public String getServerIp() {
        return serverIp;
    }

    public void setServerIp(String serverIp) {
        this.serverIp = serverIp;
    }

    public Long getBaseTime() {
        return baseTime;
    }

    public void setBaseTime(Long baseTime) {
        this.baseTime = baseTime;
    }

    public String getHandleSerialNo() {
        return handleSerialNo;
    }

    public void setHandleSerialNo(String handleSerialNo) {
        this.handleSerialNo = handleSerialNo;
    }

    public String getInfo() {
        return info;
    }

    public void setInfo(String info) {
        this.info = info;
    }

    public Long getEvenTime() {
        return evenTime;
    }

    public void setEvenTime(Long evenTime) {
        this.evenTime = evenTime;
    }

    @Override
    public String toString() {
        return "BaseInfo2{" +
                "funcId='" + funcId + '\'' +
                ", funcIdDesc='" + funcIdDesc + '\'' +
                ", serverIp='" + serverIp + '\'' +
                ", baseTime=" + baseTime +
                ", handleSerialNo='" + handleSerialNo + '\'' +
                ", info='" + info + '\'' +
                ", evenTime=" + evenTime +
                '}';
    }
}
package function;

import bean.BaseInfo;
import bean2.BaseInfo2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @Author 昌
 * @Time 2023/5/10 9:24
 * 具体的分流逻辑实现
 */
public class MyProcessFunction2 extends ProcessFunction<BaseInfo2, BaseInfo2> {
    private OutputTag<BaseInfo2> requestStream;
    private OutputTag<BaseInfo2> answerStream;

    public MyProcessFunction2(OutputTag<BaseInfo2> requestStream, 
OutputTag<BaseInfo2> answerStream) {
        this.requestStream = requestStream;
        this.answerStream = answerStream;
    }


    @Override
    public void processElement(BaseInfo2 value, ProcessFunction<BaseInfo2, 
BaseInfo2>.Context ctx, Collector<BaseInfo2> out) throws Exception {

        //将包含请求的数据输出到tag中
        if (value.getInfo().contains("请求")){
            ctx.output(requestStream,value);
        }
        //将包含应答的数据输出到tag中
        if (value.getInfo().contains("应答")){
            ctx.output(answerStream,value);
        }
    }
}
package bean;

import java.sql.Timestamp;

/**
 * @Author 昌
 * @Time 2023/5/15 14:52
 * 这个bean类用来当作两张表关联之后的结果bean
 */
public class MidInfo {
    private String funcId;
    private String funcIdDesc;
    private String serverIp;
    private Long maxTime;
    private Long minTime;
    private String pk;
    private Timestamp  et;

    public MidInfo() {
    }

    public MidInfo(String funcId, String funcIdDesc, String serverIp, Long 
maxTime, Long minTime, String pk, Timestamp et) {
        this.funcId = funcId;
        this.funcIdDesc = funcIdDesc;
        this.serverIp = serverIp;
        this.maxTime = maxTime;
        this.minTime = minTime;
        this.pk = pk;
        this.et = et;
    }

    public String getFuncId() {
        return funcId;
    }

    public void setFuncId(String funcId) {
        this.funcId = funcId;
    }

    public String getFuncIdDesc() {
        return funcIdDesc;
    }

    public void setFuncIdDesc(String funcIdDesc) {
        this.funcIdDesc = funcIdDesc;
    }

    public String getServerIp() {
        return serverIp;
    }

    public void setServerIp(String serverIp) {
        this.serverIp = serverIp;
    }

    public Long getMaxTime() {
        return maxTime;
    }

    public void setMaxTime(Long maxTime) {
        this.maxTime = maxTime;
    }

    public Long getMinTime() {
        return minTime;
    }

    public void setMinTime(Long minTime) {
        this.minTime = minTime;
    }

    public String getPk() {
        return pk;
    }

    public void setPk(String pk) {
        this.pk = pk;
    }

    public Timestamp getEt() {
        return et;
    }

    public void setEt(Timestamp et) {
        this.et = et;
    }

    @Override
    public String toString() {
        return "MidInfo{" +
                "funcId='" + funcId + '\'' +
                ", funcIdDesc='" + funcIdDesc + '\'' +
                ", serverIp='" + serverIp + '\'' +
                ", maxTime=" + maxTime +
                ", minTime=" + minTime +
                ", pk='" + pk + '\'' +
                ", et=" + et +
                '}';
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0";
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>ryi_migrate</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.14.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
            <scope>provided</scope>
        </dependency>


        <!--连接Kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>

        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>

        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>


        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.4</version>
        </dependency>



        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>


    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

回复