??????????????????midStream???????????????????????????????????????????? | //6???????????????????????? ???????????????? StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //????????????1?? ?????????????????? ?????????????????????????????????????????????????? //tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L)); // tableEnv.createTemporaryView("tableRequest", outRequestDataStream); // tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream); Table tableRequest =tableEnv.fromDataStream(outRequestDataStream, $("funcId"), $("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), $("eventTime").rowtime().as("et")); // Table tableRequest = tableEnv.fromDataStream(outRequestDataStream, Schema.newBuilder() // .column("funcId", DataTypes.STRING()) // .column("serverIp", DataTypes.STRING()) // .column("outTime", DataTypes.BIGINT()) // .column("handleSerialNo", DataTypes.STRING()) // .column("info", DataTypes.STRING()) // .column("funcIdDesc", DataTypes.STRING()) // .column("eventTime", DataTypes.TIMESTAMP(3)) // .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ") // .build()); Table tableAnswer =tableEnv.fromDataStream(outAnswerDataStream, $("funcId"), $("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), $("eventTime").rowtime()); // Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream, Schema.newBuilder() // .column("funcId", DataTypes.STRING()) // .column("serverIp", DataTypes.STRING()) // .column("outTime", DataTypes.BIGINT()) // .column("handleSerialNo", DataTypes.STRING()) // .column("info", DataTypes.STRING()) // .column("funcIdDesc", DataTypes.STRING()) // .column("eventTime", DataTypes.TIMESTAMP(3)) // .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ") // .build());
Table result = tableEnv.sqlQuery("select \n" + "\ta.funcId as funcId ,\n" + "\ta.funcIdDesc as funcIdDesc,\n" + "\ta.serverIp as serverIp,\n" + "\tb.outTime as maxTime,\n" + "\ta.outTime as minTime,\t\n" + "\tconcat(a.funcId,a.serverIp) as pk ,\n" + " a.et as et\n" + " from " + tableRequest + " a\n " + " inner join " + tableAnswer + " b" + " on a.handleSerialNo=b.handleSerialNo "); System.out.println("??????resultTable" + result); result.printSchema(); tableEnv.createTemporaryView("resultTable", result); DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class); Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("et").rowtime()) .select($("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("et")); midTable.printSchema(); tableEnv.createTemporaryView("midTable1", midTable); //????TVF???????????????????????????? Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" + "FROM TABLE(CUMULATE(\n" + " TABLE midTable1" + //" TABLE "+ midTable + " , DESCRIPTOR(et)\n" + " , INTERVAL '60' SECOND\n" + " , INTERVAL '1' DAY))\n" + " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); resulTable.printSchema(); | ???????????????????? | package job; import bean.BaseInfo; import bean.MidInfo; import bean.OutInfo; import bean.ResultInfo; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import config.FlinkConfig; import function.MyProcessFunction; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.flink.util.OutputTag; import sink.Sink2Mysql; import utils.DateUtil; import utils.DateUtils; import utils.JdbcUtil; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Timestamp; import java.time.*; import java.util.Date; import java.util.HashMap; import java.util.Properties; import static org.apache.flink.table.api.Expressions.$; /** * @Author ?? * @Time 2023/5/10 8:32 * ???? flink process???????????????????? ???????????????????????????????????????????????? * ??????????????????????????????????????????????????????????????????????row_time as cast(CURRENT_TIMESTAMP AS timestamp(3) )?? * ???????????????????????????? */ public class RytLogAnly4 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //???????????? OutputTag<BaseInfo> requestStream = new OutputTag<BaseInfo>("requestStream") { }; OutputTag<BaseInfo> answerStream = new OutputTag<BaseInfo>("answerStream") { }; //1??????????????kafka?????? String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers"); String topicName = FlinkConfig.config.getProperty("dev_topicName"); String groupId = FlinkConfig.config.getProperty("dev_groupId"); String devMode = FlinkConfig.config.getProperty("dev_mode"); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", servers); prop.setProperty("group.id", groupId); prop.setProperty("auto.offset.reset", devMode); DataStreamSource<String> sourceStream = env.addSource(new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop)); sourceStream.print("sourceStream"); //{"ip":"10.125.8.141","data":"????: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=f7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"} //2????????????????????????baseInfo?????????? SingleOutputStreamOperator<BaseInfo> baseInfoStream = sourceStream.map(new MapFunction<String, BaseInfo>() { @Override public BaseInfo map(String value) throws Exception { JSONObject jsonObject = JSON.parseObject(value); //??????????????????IP String serverIp = jsonObject.getString("ip"); //????????????data?????? String datas = jsonObject.getString("data"); String[] splits = datas.split("\n"); HashMap<String, String> dataMap = new HashMap<>(); //??time??????????????????????????????????num?????????????????? String time = splits[0].substring(7, 19); //??subData?????????????????????????????????????????? String subData = datas.substring(0, 10); for (int i = 0; i < splits.length; i++) { if (splits[i].contains("=")) { splits[i] = splits[i].replaceFirst("=", "&"); String[] temp = splits[i].split("&"); if (temp.length > 1) { dataMap.put(temp[0].toLowerCase(), temp[1]); } } } return new BaseInfo(dataMap.get("action"), serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData); } }); baseInfoStream.print("baseInfoStream"); //3??????process????????baseInfoStream?????? SingleOutputStreamOperator<BaseInfo> tagStream = baseInfoStream.process(new MyProcessFunction(requestStream, answerStream)); //4????????????tag???????????????????? DataStream<BaseInfo> requestDataStream = tagStream.getSideOutput(requestStream); DataStream<BaseInfo> answerDataStream = tagStream.getSideOutput(answerStream); requestDataStream.print("requestDataStream"); answerDataStream.print("answerDataStream"); //5????????????????????????action????????????????action????????????????????????MySQL?????? //5.1 ?????????????????? SingleOutputStreamOperator<OutInfo> outRequestDataStream = requestDataStream.map(new MapFunction<BaseInfo, OutInfo>() { @Override public OutInfo map(BaseInfo value) throws Exception { //??????????????????????action String actionType = value.getFuncId(); System.out.println(actionType); String actionName = null; Connection connection = null; PreparedStatement ps = null; //??????????action??MySQL?????????????????????? try { String sql = "select action_name from ActionType where action = ?"; connection = JdbcUtil.getConnection(); ps = connection.prepareStatement(sql); ps.setString(1, actionType); ResultSet resultSet = ps.executeQuery(); System.out.println("resultSet??" + resultSet); if (resultSet.next()) { actionName = resultSet.getString("action_name"); } } catch (Exception e) { throw new RuntimeException(e); } finally { JdbcUtil.closeResource(connection, ps); } // return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName,DateUtils.format(new Date())); return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, System.currentTimeMillis() ); } }).assignTimestampsAndWatermarks(WatermarkStrategy.<OutInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-> element.getEventTime()));; outRequestDataStream.print("outRequestDataStream"); //5.2 ???????????????? SingleOutputStreamOperator<OutInfo> outAnswerDataStream = answerDataStream.map(new MapFunction<BaseInfo, OutInfo>() { @Override public OutInfo map(BaseInfo value) throws Exception { //??????????????????????action String actionType = value.getFuncId(); System.out.println(actionType); String actionName = null; Connection connection = null; PreparedStatement ps = null; //??????????action??MySQL?????????????????????? try { String sql = "select action_name from ActionType where action = ?"; connection = JdbcUtil.getConnection(); ps = connection.prepareStatement(sql); ps.setString(1, actionType); ResultSet resultSet = ps.executeQuery(); System.out.println("resultSet??" + resultSet); if (resultSet.next()) { actionName = resultSet.getString("action_name"); } } catch (Exception e) { throw new RuntimeException(e); } finally { JdbcUtil.closeResource(connection, ps); } // return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, DateUtils.format(new Date())); return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, System.currentTimeMillis() ); } }).assignTimestampsAndWatermarks(WatermarkStrategy.<OutInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-> element.getEventTime())); outAnswerDataStream.print("outAnswerDataStream"); //6???????????????????????? ???????????????? StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //????????????1?? ?????????????????? ?????????????????????????????????????????????????? //tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L)); // tableEnv.createTemporaryView("tableRequest", outRequestDataStream); // tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream); Table tableRequest =tableEnv.fromDataStream(outRequestDataStream, $("funcId"), $("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), $("eventTime").rowtime().as("et")); // Table tableRequest = tableEnv.fromDataStream(outRequestDataStream, Schema.newBuilder() // .column("funcId", DataTypes.STRING()) // .column("serverIp", DataTypes.STRING()) // .column("outTime", DataTypes.BIGINT()) // .column("handleSerialNo", DataTypes.STRING()) // .column("info", DataTypes.STRING()) // .column("funcIdDesc", DataTypes.STRING()) // .column("eventTime", DataTypes.TIMESTAMP(3)) // .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ") // .build()); Table tableAnswer =tableEnv.fromDataStream(outAnswerDataStream, $("funcId"), $("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), $("eventTime").rowtime()); // Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream, Schema.newBuilder() // .column("funcId", DataTypes.STRING()) // .column("serverIp", DataTypes.STRING()) // .column("outTime", DataTypes.BIGINT()) // .column("handleSerialNo", DataTypes.STRING()) // .column("info", DataTypes.STRING()) // .column("funcIdDesc", DataTypes.STRING()) // .column("eventTime", DataTypes.TIMESTAMP(3)) // .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ") // .build()); Table result = tableEnv.sqlQuery("select \n" + "\ta.funcId as funcId ,\n" + "\ta.funcIdDesc as funcIdDesc,\n" + "\ta.serverIp as serverIp,\n" + "\tb.outTime as maxTime,\n" + "\ta.outTime as minTime,\t\n" + "\tconcat(a.funcId,a.serverIp) as pk ,\n" + " a.et as et\n" + " from " + tableRequest + " a\n " + " inner join " + tableAnswer + " b" + " on a.handleSerialNo=b.handleSerialNo "); System.out.println("??????resultTable" + result); result.printSchema(); tableEnv.createTemporaryView("resultTable", result); DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class); Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("et").rowtime()) .select($("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("et")); midTable.printSchema(); tableEnv.createTemporaryView("midTable1", midTable); //????TVF???????????????????????????? Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" + "FROM TABLE(CUMULATE(\n" + " TABLE midTable1" + //" TABLE "+ midTable + " , DESCRIPTOR(et)\n" + " , INTERVAL '60' SECOND\n" + " , INTERVAL '1' DAY))\n" + " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); resulTable.printSchema(); //tableEnv.executeSql("select * from "+resulTable).print(); // DataStream<Tuple2<Boolean, ResultInfo>> resultStream = tableEnv.toRetractStream(resulTable, ResultInfo.class); // resultStream.print("resultStream"); // SingleOutputStreamOperator<ResultInfo> resultInfoStream = resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>() { // @Override // public ResultInfo map(Tuple2<Boolean, ResultInfo> value) throws Exception { // return value.f1; // } // }); // resultInfoStream.print("resultInfoStream"); // resultInfoStream.addSink(new Sink2Mysql()); env.execute(); } } | | | ???????? | | ccc0606fight...@163.com | ---- ???????????? ---- | ?????? | L Y<531599...@qq.com.INVALID> | | ???????? | 2023??5??20?? 01:10 | | ?????? | user-zh<user-zh@flink.apache.org> | | ???? | ??????table api????rowtime?????? | HI?????????? ??????????????????midStream??????????????????????????????????????????????midStream???????????????????????????????????????????????????????????????????????? ?????? SingleOutputStreamOperator<Event> eventStream = env .fromElements( .............. ).assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner( new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event event, long l) { return event.timestamp; } } ) ); ??????????????????????midStream????????????????flink?????? LY 531599...@qq.com L Y 531599...@qq.com ------------------ ???????? ------------------ ??????: "user-zh" <ccc0606fight...@163.com>; ????????: 2023??5??17??(??????) ????9:28 ??????: "user-zh"<user-zh@flink.apache.org>; ????: table api????rowtime?????? ???????????????????????????? | Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime()); tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" + "FROM TABLE(CUMULATE(\n" + " TABLE midTable1"+ //" TABLE "+ midTable + " , DESCRIPTOR(eventTime)\n" + " , INTERVAL '60' SECOND\n" + " , INTERVAL '1' DAY))\n" + " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); | ??????????????????????????????????????eventTime??rowtime,????????????????sqlQuery??????????????????????Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic ?????????????????????????? | | ???????? | | ccc0606fight...@163.com |