刚才的邮件正文代码出现乱码,现在重新发送。<br/>-------------------------------------------------<br/><br/>您好!<br/><br/>我在运行Flink程序时遇到了一个问题,特来向各位大佬请教。<br/><br/>程序目标:<br/>用Flink&nbsp;SQL求窗口&amp;nbsp;Top-5,开一小时的窗口。数据源为Kafka,我分批向Kafka里传入数据。计算出的&amp;nbsp;Top-5结果,写入MySQL。<br/><br/>问题:<br/>一小时窗口设置完全没生效,事件时间和处理时间两种时间语义都测试了。我每向Kafka里传入一批数据,MySQL都会看到五条新增的&nbsp;Top-5数据,可两批源数据之间的时间间隔并没有到一小时。<br/><br/>问题代码初步定位:<br/>TUMBLE(&nbsp;TABLE&nbsp;watermarkedTable,&nbsp;DESCRIPTOR(ts),&nbsp;INTERVAL&nbsp;&#39;1&#39;&nbsp;HOUR&nbsp;)<br/><br/>完整源代码:<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;final&nbsp;StreamExecutionEnvironment&nbsp;streamExecutionEnvironment&nbsp;=&nbsp;StreamExecutionEnvironment.getExecutionEnvironment();<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;//&nbsp;Create&nbsp;table&nbsp;environment<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;StreamTableEnvironment&nbsp;streamTableEnvironment&nbsp;=&nbsp;StreamTableEnvironment.create(streamExecutionEnvironment);<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;//&nbsp;接入Kafka数据源<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;KafkaSource&lt;string&gt;&nbsp;kafkaSource&nbsp;=&nbsp;KafkaSource<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;.&lt;string&gt;builder()<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;.setBootstrapServers(Config.KAFKA_BROKERS)<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;.setTopics(Config.KAFKA_TOPIC_EVENT)<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;.setGroupId(&quot;flink-consumer&quot;)<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;.setStartingOffsets(OffsetsInitializer.earliest())<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;.setValueOnlyDeserializer(new&nbsp;SimpleStringSchema())<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;.build();<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;DataStreamSource&lt;string&gt;&nbsp;stringStream&nbsp;=&nbsp;streamExecutionEnvironment<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;.fromSource(<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;kafkaSource,<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;WatermarkStrategy.noWatermarks(),<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;Kafka&nbsp;string&nbsp;source&nbsp;without&nbsp;watermark&quot;<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;);<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;//&nbsp;Deserialize&nbsp;string&nbsp;stream<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;SingleOutputStreamOperator&lt;event&gt;&nbsp;deserializedStream&nbsp;=&nbsp;stringStream<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;.map(<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;new&nbsp;MapFunction&lt;string,&nbsp;event=&quot;&quot;&gt;()&nbsp;{<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;public&nbsp;Event&nbsp;map(String&nbsp;jsonString)&nbsp;throws&nbsp;Exception&nbsp;{<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;SimpleDateFormat&nbsp;simpleDateFormat&nbsp;=&nbsp;new&nbsp;SimpleDateFormat(&quot;yyyy-MM-dd&nbsp;HH:mm:ss&quot;);<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;ObjectMapper&nbsp;objectMapper&nbsp;=&nbsp;new&nbsp;ObjectMapper();<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;objectMapper.setDateFormat(simpleDateFormat);<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,&nbsp;false);<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;Event&nbsp;deserializedObject&nbsp;=&nbsp;objectMapper.readValue(jsonString,&nbsp;Event.class);<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;return&nbsp;deserializedObject;<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;}<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;});<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;SingleOutputStreamOperator&lt;event&gt;&nbsp;watermarkedStream&nbsp;=&nbsp;deserializedStream.assignTimestampsAndWatermarks(<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;WatermarkStrategy<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;.&lt;event&gt;forBoundedOutOfOrderness(Duration.ofSeconds(0L))<br/>//&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;.withTimestampAssigner((event,&nbsp;l)&nbsp;-&amp;gt;&nbsp;event.getTs().getTime())<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;.withTimestampAssigner(<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;new&nbsp;SerializableTimestampAssigner&lt;event&gt;()&nbsp;{<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;@Override<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;public&nbsp;long&nbsp;extractTimestamp(Event&nbsp;event,&nbsp;long&nbsp;kafkaTimestamp)&nbsp;{<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;Timestamp&nbsp;timestamp&nbsp;=&nbsp;event.getOccurrentTime();<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;long&nbsp;time&nbsp;=&nbsp;timestamp.getTime();<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;return&nbsp;time;<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;}<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;}<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;)<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;);<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;//&nbsp;将流数据转换成表<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;Table&nbsp;watermarkedTable&nbsp;=&nbsp;streamTableEnvironment.fromDataStream(<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;watermarkedStream,<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;$(&quot;uid&quot;),<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;$(&quot;eventId&quot;),<br/>//&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;$(&quot;eventName&quot;),<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;$(&quot;serviceId&quot;),<br/>//&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;$(&quot;serviceName&quot;),<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;$(&quot;occurrentTime&quot;).rowtime().as(&quot;ts&quot;)<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;);<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;//&nbsp;注册table用于SQL&nbsp;API<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;streamTableEnvironment.createTemporaryView(&quot;watermarkedTable&quot;,&nbsp;watermarkedTable);<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;//&nbsp;watermarkedTable.printSchema();<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;String&nbsp;countQuery&nbsp;=<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;SELECT&nbsp;eventId,&nbsp;&quot;&nbsp;+<br/>//&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&quot;eventName,&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;serviceId,&nbsp;&quot;&nbsp;+<br/>//&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&quot;serviceName,&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;COUNT(*)&nbsp;as&nbsp;eventCount,&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;COUNT(DISTINCT&nbsp;uid)&nbsp;as&nbsp;userCount&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;FROM&nbsp;TABLE&nbsp;(&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;TUMBLE(&nbsp;TABLE&nbsp;watermarkedTable,&nbsp;DESCRIPTOR(ts),&nbsp;INTERVAL&nbsp;&#39;1&#39;&nbsp;HOUR&nbsp;)&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;)&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;GROUP&nbsp;BY&nbsp;eventId,&nbsp;serviceId&nbsp;&quot;;<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;String&nbsp;sortEventQuery&nbsp;=<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;SELECT&nbsp;*,&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;ROW_NUMBER()&nbsp;OVER&nbsp;(&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;ORDER&nbsp;BY&nbsp;eventCount&nbsp;desc&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;)&nbsp;AS&nbsp;eventRank&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;FROM&nbsp;(&quot;&nbsp;+&nbsp;countQuery&nbsp;+&nbsp;&quot;)&nbsp;&quot;;<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;String&nbsp;top5Query&nbsp;=<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;SELECT&nbsp;*&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;FROM&nbsp;(&quot;&nbsp;+&nbsp;sortEventQuery&nbsp;+&nbsp;&quot;)&nbsp;&quot;&nbsp;+<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;WHERE&nbsp;eventRank&nbsp;&amp;lt;=&nbsp;5&nbsp;&quot;;<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;//&nbsp;执行SQL得到结果表<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;Table&nbsp;queryResultTable&nbsp;=&nbsp;streamTableEnvironment.sqlQuery(top5Query);<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;//&nbsp;Create&nbsp;sink&nbsp;table<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;String&nbsp;top5EventTableDDL&nbsp;=<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&quot;CREATE&nbsp;TABLE&nbsp;top5EventTable&nbsp;(&nbsp;......因为涉及账号密码,暂且省略......)&quot;;<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;streamTableEnvironment.executeSql(top5EventTableDDL);<br/><br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;//&nbsp;&amp;nbsp;持久化<br/>&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;TableResult&nbsp;sinkResult&nbsp;=&nbsp;queryResultTable.executeInsert(&quot;top5EventTable&quot;);<br/><br/><br/>感谢您花时间查看这个问题!<br/>Lucas&lt;/event&gt;&lt;/event&gt;&lt;/event&gt;&lt;/string,&gt;&lt;/event&gt;&lt;/string&gt;&lt;/string&gt;&lt;/string&gt;

回复