答复: Flink 在什么情况下产生乱序问题?

2019-03-06 文章 戴嘉诚
你可以了解下触发器,默认的触发器是按照你发现的做,如果你要实时输出,可以吧触发器更改为ContinuonsEventTimeTrigger 
,然后设置你的时间间隔。

发件人: 刘 文
发送时间: 2019年3月6日 22:55
收件人: user-zh@flink.apache.org
抄送: qcx978132...@gmail.com
主题: Re: Flink 在什么情况下产生乱序问题?

).在验证EventTime 加watermark 处理中,我发现往socket发送的数据,不能及时输出或没有输出
).验证发现,只有当前发送的数据的 getCurrentWatermark()的时间戳 > TimeWindow + maxOutOfOrderness 
时,才会触发结束上一次window
).可是最新的记录是不能及时被处理,或者是不能被处理
).请问这个问题怎么处理?









---

> 在 2019年3月6日,下午10:29,刘 文  写道:
> 
> 该问题,明白一点了,整理成文档供大家参考
> ———
> 
> Flink 1.7.2 业务时间戳分析流式数据源码分析: 
> https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md
>  
> 
> 
> 
> ———
> 
> 
> 
> Flink 1.7.2 业务时间戳分析流式数据源码分析
> 
>  
> 源码
> 
> https://github.com/opensourceteams/flink-maven-scala 
> 
>  
> 概述
> 
> 由于Flink默认的ProcessTime是按Window收到Source发射过来的数据的时间,来算了,也就是按Flink程序接收的时间来进行计算,但实际业务,处理周期性的数据时,每5分钟内的数据,每1个小时内的数据进行分析,实际是业务源发生的时间来做为实际时间,所以用Flink的EventTime和Watermark来处理这个问题
> 指定Env为EventTime
> 调置数据流assignTimestampsAndWatermarks函数,由AssignerWithPeriodicWatermarks中的extractTimestamp()函数提取实际业务时间,getCurrentWatermark得到最新的时间,这个会对每个元素算一次,拿最大的当做计算时间,如果当前时间,大于上一次的时间间隔
>  + 这里设置的延时时间,就会结束上一个Window,也就是对这一段时间的Window进行操作
> 本程序以指定业务时间,来做为统计时间
>  
> 程序
> 
> package 
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime
> 
> import java.util.{Date, Properties}
> 
> import com.alibaba.fastjson.JSON
> import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.util.Collector
> 
> 
> object SockWordCountRun {
> 
> 
> 
>   def main(args: Array[String]): Unit = {
> 
> 
> // get the execution environment
>// val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> 
> 
> val configuration : Configuration = 
> ConfigurationUtil.getConfiguration(true)
> 
> val env:StreamExecutionEnvironment = 
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> 
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
> 
> import org.apache.flink.streaming.api.scala._
> val dataStream = env.socketTextStream("localhost", 1234, '\n')
> 
>  // .setParallelism(3)
> 
> 
> dataStream.assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[String] {
> 
> val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
> var currentMaxTimestamp: Long = _
> var currentTimestamp: Long = _
> 
> override def getCurrentWatermark: Watermark =  new 
> Watermark(currentMaxTimestamp - maxOutOfOrderness)
> 
> override def extractTimestamp(element: String, 
> previousElementTimestamp: Long): Long = {
>   val jsonObject = JSON.parseObject(element)
> 
>   val timestamp = jsonObject.getLongValue("extract_data_time")
>   currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
>   currentTimestamp = timestamp
> 
> /*  println("===watermark begin===")
>   println()
>   println(new Date(currentMaxTimestamp - 20 * 1000))
>   println(jsonObject)
>   println("===watermark end===")
>   println()*/
>   timestamp
> }
> 
>   })
>   .timeWindowAll(Time.seconds(3))
> 
>   .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
>   override def process(context: Context, elements: Iterable[String], out: 
> Collector[String]): Unit = {
> 
> 
> println()
> println("开始提交window")
> println(new Date())
> for(e <- elements) out.collect(e)
> 

sql-client batch 模式执行报错

2019-03-06 文章 yuess_coder
我在sql-client提交任务:


create table csv_source1(
id varchar,
name varchar
) with (
type ='csv',
path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
);




create table csv_sink(
id varchar,
name varchar
) with (
type ='csv',
path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
);


insert into csv_sink  select t1.name,t1.id from csv_source1 t1




错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution 
batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?

Re: Flink 在什么情况下产生乱序问题?

2019-03-06 文章 刘 文
).在验证EventTime 加watermark 处理中,我发现往socket发送的数据,不能及时输出或没有输出
).验证发现,只有当前发送的数据的 getCurrentWatermark()的时间戳 > TimeWindow + maxOutOfOrderness 
时,才会触发结束上一次window
).可是最新的记录是不能及时被处理,或者是不能被处理
).请问这个问题怎么处理?









---

> 在 2019年3月6日,下午10:29,刘 文  写道:
> 
> 该问题,明白一点了,整理成文档供大家参考
> ———
> 
> Flink 1.7.2 业务时间戳分析流式数据源码分析: 
> https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md
>  
> 
> 
> 
> ———
> 
> 
> 
> Flink 1.7.2 业务时间戳分析流式数据源码分析
> 
>  
> 源码
> 
> https://github.com/opensourceteams/flink-maven-scala 
> 
>  
> 概述
> 
> 由于Flink默认的ProcessTime是按Window收到Source发射过来的数据的时间,来算了,也就是按Flink程序接收的时间来进行计算,但实际业务,处理周期性的数据时,每5分钟内的数据,每1个小时内的数据进行分析,实际是业务源发生的时间来做为实际时间,所以用Flink的EventTime和Watermark来处理这个问题
> 指定Env为EventTime
> 调置数据流assignTimestampsAndWatermarks函数,由AssignerWithPeriodicWatermarks中的extractTimestamp()函数提取实际业务时间,getCurrentWatermark得到最新的时间,这个会对每个元素算一次,拿最大的当做计算时间,如果当前时间,大于上一次的时间间隔
>  + 这里设置的延时时间,就会结束上一个Window,也就是对这一段时间的Window进行操作
> 本程序以指定业务时间,来做为统计时间
>  
> 程序
> 
> package 
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime
> 
> import java.util.{Date, Properties}
> 
> import com.alibaba.fastjson.JSON
> import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.util.Collector
> 
> 
> object SockWordCountRun {
> 
> 
> 
>   def main(args: Array[String]): Unit = {
> 
> 
> // get the execution environment
>// val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> 
> 
> val configuration : Configuration = 
> ConfigurationUtil.getConfiguration(true)
> 
> val env:StreamExecutionEnvironment = 
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> 
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
> 
> import org.apache.flink.streaming.api.scala._
> val dataStream = env.socketTextStream("localhost", 1234, '\n')
> 
>  // .setParallelism(3)
> 
> 
> dataStream.assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[String] {
> 
> val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
> var currentMaxTimestamp: Long = _
> var currentTimestamp: Long = _
> 
> override def getCurrentWatermark: Watermark =  new 
> Watermark(currentMaxTimestamp - maxOutOfOrderness)
> 
> override def extractTimestamp(element: String, 
> previousElementTimestamp: Long): Long = {
>   val jsonObject = JSON.parseObject(element)
> 
>   val timestamp = jsonObject.getLongValue("extract_data_time")
>   currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
>   currentTimestamp = timestamp
> 
> /*  println("===watermark begin===")
>   println()
>   println(new Date(currentMaxTimestamp - 20 * 1000))
>   println(jsonObject)
>   println("===watermark end===")
>   println()*/
>   timestamp
> }
> 
>   })
>   .timeWindowAll(Time.seconds(3))
> 
>   .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
>   override def process(context: Context, elements: Iterable[String], out: 
> Collector[String]): Unit = {
> 
> 
> println()
> println("开始提交window")
> println(new Date())
> for(e <- elements) out.collect(e)
> println("结束提交window")
> println(new Date())
> println()
>   }
> })
> 
>   .print()
>   //.setParallelism(3)
> 
> 
> 
> 
> 
> 
> 

Re: Flink 在什么情况下产生乱序问题?

2019-03-06 文章 Congxian Qiu
hi
对于 kafka 来说,单 partition 内的消息可以保证顺序,但是 partition A 和 partition B 之间的消息顺序是没法保证的。

Best, Congxian
On Mar 5, 2019, 18:35 +0800, 刘 文 , wrote:
> 请教一下,大家说的Flink 乱序问题,是什么情况下产生,我没明白?
> ).谁给我一下会产生乱序问题的场景吗?
> ).以下是读取kafka中的数据,三个并行度
> ).输出的结果如下:(总数据20条)
>
> 3> Message_3
> 1> Message_1
> 2> Message_2
> 1> Message_4
> 2> Message_5
> 3> Message_6
> 2> Message_8
> 1> Message_7
> 2> Message_11
> 3> Message_9
> 2> Message_14
> 1> Message_10
> 2> Message_17
> 3> Message_12
> 2> Message_20
> 1> Message_13
> 3> Message_15
> 1> Message_16
> 3> Message_18
> 1> Message_19