??????????????????midStream????????????????????????????????????????????
|
//6???????????????????????? ????????????????
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//????????????1?? ?????????????????? 
??????????????????????????????????????????????????
//tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
//        tableEnv.createTemporaryView("tableRequest", outRequestDataStream);
//        tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream);
Table tableRequest =tableEnv.fromDataStream(outRequestDataStream, $("funcId"), 
$("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), 
$("eventTime").rowtime().as("et"));
//        Table tableRequest = tableEnv.fromDataStream(outRequestDataStream, 
Schema.newBuilder()
//                .column("funcId", DataTypes.STRING())
//                .column("serverIp", DataTypes.STRING())
//                .column("outTime", DataTypes.BIGINT())
//                .column("handleSerialNo", DataTypes.STRING())
//                .column("info", DataTypes.STRING())
//                .column("funcIdDesc", DataTypes.STRING())
//                .column("eventTime", DataTypes.TIMESTAMP(3))
//                .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
//                .build());
Table tableAnswer =tableEnv.fromDataStream(outAnswerDataStream, $("funcId"), 
$("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), 
$("eventTime").rowtime());
//        Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream, 
Schema.newBuilder()
//                .column("funcId", DataTypes.STRING())
//                .column("serverIp", DataTypes.STRING())
//                .column("outTime", DataTypes.BIGINT())
//                .column("handleSerialNo", DataTypes.STRING())
//                .column("info", DataTypes.STRING())
//                .column("funcIdDesc", DataTypes.STRING())
//                .column("eventTime", DataTypes.TIMESTAMP(3))
//                .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
//                .build());


Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.outTime as maxTime,\n" +
"\ta.outTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.et  as et\n" +
" from " + tableRequest + " a\n " +
" inner join " + tableAnswer + " b" +
" on a.handleSerialNo=b.handleSerialNo ");
        System.out.println("??????resultTable" + result);
        result.printSchema();
        tableEnv.createTemporaryView("resultTable", result);

        DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, 
MidInfo.class);
        Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), 
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), 
$("et").rowtime())
                .select($("funcId"), $("funcIdDesc"), $("serverIp"), 
$("maxTime"), $("minTime"), $("pk"), $("et"));
        midTable.printSchema();
        tableEnv.createTemporaryView("midTable1", midTable);

//????TVF????????????????????????????
Table resulTable = tableEnv.sqlQuery("SELECT 
funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable1" +
//" TABLE "+ midTable +
" , DESCRIPTOR(et)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
        resulTable.printSchema();
|


????????????????????
|
package job;

import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author ??
* @Time 2023/5/10 8:32
 * ???? flink process???????????????????? 
????????????????????????????????????????????????
* 
??????????????????????????????????????????????????????????????????????row_time 
as cast(CURRENT_TIMESTAMP  AS timestamp(3) )??
* ????????????????????????????
*/
public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


//????????????
OutputTag<BaseInfo> requestStream = new OutputTag<BaseInfo>("requestStream") {
        };
        OutputTag<BaseInfo> answerStream = new 
OutputTag<BaseInfo>("answerStream") {
        };

//1??????????????kafka??????
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
        String topicName = FlinkConfig.config.getProperty("dev_topicName");
        String groupId = FlinkConfig.config.getProperty("dev_groupId");
        String devMode = FlinkConfig.config.getProperty("dev_mode");
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", servers);
        prop.setProperty("group.id", groupId);
        prop.setProperty("auto.offset.reset", devMode);
        DataStreamSource<String> sourceStream = env.addSource(new 
FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
        sourceStream.print("sourceStream");
//{"ip":"10.125.8.141","data":"????: -- 14:28:05.111 -- 
<44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=f7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
        //2????????????????????????baseInfo??????????
SingleOutputStreamOperator<BaseInfo> baseInfoStream = sourceStream.map(new 
MapFunction<String, BaseInfo>() {
@Override
public BaseInfo map(String value) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
//??????????????????IP
String serverIp = jsonObject.getString("ip");
//????????????data??????
String datas = jsonObject.getString("data");

                String[] splits = datas.split("\n");
                HashMap<String, String> dataMap = new HashMap<>();
//??time??????????????????????????????????num??????????????????
String time = splits[0].substring(7, 19);
//??subData??????????????????????????????????????????
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
                        splits[i] = splits[i].replaceFirst("=", "&");
                        String[] temp = splits[i].split("&");
if (temp.length > 1) {
                            dataMap.put(temp[0].toLowerCase(), temp[1]);
                        }
                    }
                }
return new BaseInfo(dataMap.get("action"), serverIp, 
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
            }
        });
        baseInfoStream.print("baseInfoStream");
//3??????process????????baseInfoStream??????
SingleOutputStreamOperator<BaseInfo> tagStream = baseInfoStream.process(new 
MyProcessFunction(requestStream, answerStream));

//4????????????tag????????????????????
DataStream<BaseInfo> requestDataStream = tagStream.getSideOutput(requestStream);
        DataStream<BaseInfo> answerDataStream = 
tagStream.getSideOutput(answerStream);

        requestDataStream.print("requestDataStream");
        answerDataStream.print("answerDataStream");

//5????????????????????????action????????????????action????????????????????????MySQL??????
//5.1 ??????????????????
SingleOutputStreamOperator<OutInfo> outRequestDataStream = 
requestDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
public OutInfo map(BaseInfo value) throws Exception {
//??????????????????????action
String actionType = value.getFuncId();
                System.out.println(actionType);
                String actionName = null;
                Connection connection = null;
                PreparedStatement ps = null;

//??????????action??MySQL??????????????????????
try {
                    String sql = "select action_name from ActionType where 
action = ?";
                    connection = JdbcUtil.getConnection();
                    ps = connection.prepareStatement(sql);
                    ps.setString(1, actionType);
                    ResultSet resultSet = ps.executeQuery();
                    System.out.println("resultSet??" + resultSet);
if (resultSet.next()) {
                        actionName = resultSet.getString("action_name");
                    }
                } catch (Exception e) {
throw new RuntimeException(e);
                } finally {
                    JdbcUtil.closeResource(connection, ps);
                }
//                return new OutInfo(value.getFuncId(), value.getServerIp(), 
value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), 
actionName,DateUtils.format(new Date()));
return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), 
value.getHandleSerialNo(), value.getInfo(), actionName, 
System.currentTimeMillis() );
            }
        
}).assignTimestampsAndWatermarks(WatermarkStrategy.<OutInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)->
 element.getEventTime()));;
        outRequestDataStream.print("outRequestDataStream");


//5.2 ????????????????
SingleOutputStreamOperator<OutInfo> outAnswerDataStream = 
answerDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
public OutInfo map(BaseInfo value) throws Exception {
//??????????????????????action
String actionType = value.getFuncId();
                System.out.println(actionType);
                String actionName = null;
                Connection connection = null;
                PreparedStatement ps = null;

//??????????action??MySQL??????????????????????
try {
                    String sql = "select action_name from ActionType where 
action = ?";
                    connection = JdbcUtil.getConnection();
                    ps = connection.prepareStatement(sql);
                    ps.setString(1, actionType);
                    ResultSet resultSet = ps.executeQuery();
                    System.out.println("resultSet??" + resultSet);
if (resultSet.next()) {
                        actionName = resultSet.getString("action_name");
                    }
                } catch (Exception e) {
throw new RuntimeException(e);
                } finally {
                    JdbcUtil.closeResource(connection, ps);
                }
//                return new OutInfo(value.getFuncId(), value.getServerIp(), 
value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, 
DateUtils.format(new Date()));
return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), 
value.getHandleSerialNo(), value.getInfo(), actionName, 
System.currentTimeMillis() );
            }
        
}).assignTimestampsAndWatermarks(WatermarkStrategy.<OutInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)->
 element.getEventTime()));
        outAnswerDataStream.print("outAnswerDataStream");


//6???????????????????????? ????????????????
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//????????????1?? ?????????????????? 
??????????????????????????????????????????????????
//tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
//        tableEnv.createTemporaryView("tableRequest", outRequestDataStream);
//        tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream);
Table tableRequest =tableEnv.fromDataStream(outRequestDataStream, $("funcId"), 
$("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), 
$("eventTime").rowtime().as("et"));
//        Table tableRequest = tableEnv.fromDataStream(outRequestDataStream, 
Schema.newBuilder()
//                .column("funcId", DataTypes.STRING())
//                .column("serverIp", DataTypes.STRING())
//                .column("outTime", DataTypes.BIGINT())
//                .column("handleSerialNo", DataTypes.STRING())
//                .column("info", DataTypes.STRING())
//                .column("funcIdDesc", DataTypes.STRING())
//                .column("eventTime", DataTypes.TIMESTAMP(3))
//                .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
//                .build());
Table tableAnswer =tableEnv.fromDataStream(outAnswerDataStream, $("funcId"), 
$("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), 
$("eventTime").rowtime());
//        Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream, 
Schema.newBuilder()
//                .column("funcId", DataTypes.STRING())
//                .column("serverIp", DataTypes.STRING())
//                .column("outTime", DataTypes.BIGINT())
//                .column("handleSerialNo", DataTypes.STRING())
//                .column("info", DataTypes.STRING())
//                .column("funcIdDesc", DataTypes.STRING())
//                .column("eventTime", DataTypes.TIMESTAMP(3))
//                .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
//                .build());


Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.outTime as maxTime,\n" +
"\ta.outTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.et  as et\n" +
" from " + tableRequest + " a\n " +
" inner join " + tableAnswer + " b" +
" on a.handleSerialNo=b.handleSerialNo ");
        System.out.println("??????resultTable" + result);
        result.printSchema();
        tableEnv.createTemporaryView("resultTable", result);

        DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, 
MidInfo.class);
        Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), 
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), 
$("et").rowtime())
                .select($("funcId"), $("funcIdDesc"), $("serverIp"), 
$("maxTime"), $("minTime"), $("pk"), $("et"));
        midTable.printSchema();
        tableEnv.createTemporaryView("midTable1", midTable);

//????TVF????????????????????????????
Table resulTable = tableEnv.sqlQuery("SELECT 
funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable1" +
//" TABLE "+ midTable +
" , DESCRIPTOR(et)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
        resulTable.printSchema();
//tableEnv.executeSql("select * from "+resulTable).print();

//        DataStream<Tuple2<Boolean, ResultInfo>> resultStream = 
tableEnv.toRetractStream(resulTable, ResultInfo.class);
//        resultStream.print("resultStream");
//        SingleOutputStreamOperator<ResultInfo> resultInfoStream = 
resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>() {
//            @Override
//            public ResultInfo map(Tuple2<Boolean, ResultInfo> value) throws 
Exception {
//                return value.f1;
//            }
//        });
//        resultInfoStream.print("resultInfoStream");
//        resultInfoStream.addSink(new Sink2Mysql());


env.execute();


    }
}


|
| |
????????
|
|
ccc0606fight...@163.com
|
---- ???????????? ----
| ?????? | L Y<531599...@qq.com.INVALID> |
| ???????? | 2023??5??20?? 01:10 |
| ?????? | user-zh<user-zh@flink.apache.org> |
| ???? | ??????table api????rowtime?????? |
HI??????????
??????????????????midStream??????????????????????????????????????????????midStream????????????????????????????????????????????????????????????????????????
??????


SingleOutputStreamOperator<Event&gt; eventStream = env
.fromElements(
..............                ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event&gt;forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event&gt;() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);





??????????????????????midStream????????????????flink??????


LY
531599...@qq.com




L&nbsp;Y
531599...@qq.com



&nbsp;




------------------&nbsp;????????&nbsp;------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<ccc0606fight...@163.com&gt;;
????????:&nbsp;2023??5??17??(??????) ????9:28
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;table api????rowtime??????



????????????????????????????

| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), 
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), 
$("eventTime").rowtime());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = 
tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as 
minTime\n" 
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "FROM TABLE(CUMULATE(\n" 
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " TABLE 
midTable1"+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 //" TABLE "+ midTable 
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " , DESCRIPTOR(eventTime)\n" 
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " , INTERVAL '60' SECOND\n" 
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " , INTERVAL '1' DAY))\n" 
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
??????????????????????????????????????eventTime??rowtime,????????????????sqlQuery??????????????????????Rowtime
 timestamp is not defined. Please make sure that a proper TimestampAssigner is 
defined and the stream environment uses the EventTime time characteristic
??????????????????????????
| |
????????
|
|
ccc0606fight...@163.com
|

回复