Flink1.2对 key 进行分区,和 hash 分区有什么区别?

2021-04-05 文章




Flink1.2对 key 进行分区,和 hash 分区有什么区别?
如: 分区数值 = key 的 hash值 % 并行度?


为什么不直接使用 hash 进行分区?


KeyGroupStreamPartitioner.java
|
|
@Override
public int selectChannel(SerializationDelegate> record) {
K key;
   try {
  key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + 
record.getInstance().getValue(), e);
}
return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, 
numberOfChannels);
}

Flink tableApi 按列排序,只能按一列,不能按多列排序吗?

2019-03-15 文章

输出结果,只按id降序排序,没有按value1升序排序。







package 
com.opensourceteams.module.bigdata.flink.example.tableapi.operation.orderBy

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

env.setParallelism(1)

val dataSet = env.fromElements( (1,"a",10),(2,"b",20) 
,(20,"f",200),(3,"c",30) )



//从dataset转化为 table
val table = tableEnv.fromDataSet(dataSet)

//注册table
tableEnv.registerTable("user1",table)


//查询table 所有数据
tableEnv.scan("user1").as('id,'name,'value1)
  //.orderBy('id.asc)  //按id列,升序排序(注意是按分区来排序)
  .orderBy('id.desc)
  .orderBy('value1.asc)

  .first(1000)

  //print 输出 (相当于sink)
  .print()


/**
  * 输出结果
  * 
  * 20,f,200
  * 3,c,30
  * 2,b,20
  * 1,a,10
  */



  }

}



Re: Flink 在什么情况下产生乱序问题?

2019-03-06 文章
).在验证EventTime 加watermark 处理中,我发现往socket发送的数据,不能及时输出或没有输出
).验证发现,只有当前发送的数据的 getCurrentWatermark()的时间戳 > TimeWindow + maxOutOfOrderness 
时,才会触发结束上一次window
).可是最新的记录是不能及时被处理,或者是不能被处理
).请问这个问题怎么处理?









---

> 在 2019年3月6日,下午10:29,刘 文  写道:
> 
> 该问题,明白一点了,整理成文档供大家参考
> ———
> 
> Flink 1.7.2 业务时间戳分析流式数据源码分析: 
> https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md
>  
> <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md>
> 
> 
> ———
> 
> 
> 
> Flink 1.7.2 业务时间戳分析流式数据源码分析
> 
>  
> <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#%E6%BA%90%E7%A0%81>源码
> 
> https://github.com/opensourceteams/flink-maven-scala 
> <https://github.com/opensourceteams/flink-maven-scala>
>  
> <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#%E6%A6%82%E8%BF%B0>概述
> 
> 由于Flink默认的ProcessTime是按Window收到Source发射过来的数据的时间,来算了,也就是按Flink程序接收的时间来进行计算,但实际业务,处理周期性的数据时,每5分钟内的数据,每1个小时内的数据进行分析,实际是业务源发生的时间来做为实际时间,所以用Flink的EventTime和Watermark来处理这个问题
> 指定Env为EventTime
> 调置数据流assignTimestampsAndWatermarks函数,由AssignerWithPeriodicWatermarks中的extractTimestamp()函数提取实际业务时间,getCurrentWatermark得到最新的时间,这个会对每个元素算一次,拿最大的当做计算时间,如果当前时间,大于上一次的时间间隔
>  + 这里设置的延时时间,就会结束上一个Window,也就是对这一段时间的Window进行操作
> 本程序以指定业务时间,来做为统计时间
>  
> <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#%E7%A8%8B%E5%BA%8F>程序
> 
> package 
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime
> 
> import java.util.{Date, Properties}
> 
> import com.alibaba.fastjson.JSON
> import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.util.Collector
> 
> 
> object SockWordCountRun {
> 
> 
> 
>   def main(args: Array[String]): Unit = {
> 
> 
> // get the execution environment
>// val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> 
> 
> val configuration : Configuration = 
> ConfigurationUtil.getConfiguration(true)
> 
> val env:StreamExecutionEnvironment = 
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> 
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
> 
> import org.apache.flink.streaming.api.scala._
> val dataStream = env.socketTextStream("localhost", 1234, '\n')
> 
>  // .setParallelism(3)
> 
> 
> dataStream.assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[String] {
> 
> val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
> var currentMaxTimestamp: Long = _
> var currentTimestamp: Long = _
> 
> override def getCurrentWatermark: Watermark =  new 
> Watermark(currentMaxTimestamp - maxOutOfOrderness)
> 
> override def extractTimestamp(element: String, 
> previousElementTimestamp: Long): Long = {
>   val jsonObject = JSON.parseObject(element)
> 
>   val timestamp = jsonObject.getLongValue("extract_data_time")
>   currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
>   currentTimestamp = timestamp
> 
> /*  println("===watermark begin===")
>   println()
>   println(new Date(currentMaxTimestamp - 20 * 1000))
>   println(jsonObject)
>   println("===watermark end===")
>   println()*/
>   timestamp
> }
> 
>   })
>   .timeWindowAll(Time.seconds(3))
> 
>   .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
>   override def process(context: Context, elemen

Flink 在什么情况下产生乱序问题?

2019-03-05 文章
请教一下,大家说的Flink 乱序问题,是什么情况下产生,我没明白?
).谁给我一下会产生乱序问题的场景吗?
).以下是读取kafka中的数据,三个并行度
).输出的结果如下:(总数据20条)

3> Message_3
1> Message_1
2> Message_2
1> Message_4
2> Message_5
3> Message_6
2> Message_8
1> Message_7
2> Message_11
3> Message_9
2> Message_14
1> Message_10
2> Message_17
3> Message_12
2> Message_20
1> Message_13
3> Message_15
1> Message_16
3> Message_18
1> Message_19

Re: 如何每五分钟统计一次当天某个消息的总条数

2019-03-04 文章

处理这个问题,我有一些想法:

).Flink Stream默认是处理增量数据,对指定间隔时间或数量内的数据进行分析
).可以自定义 
ProcessAllWindowFunction,相当于,对于一个Window的数据,自己实现处理逻辑,参数是在Window之前的operator也是已经处理的
).对于你,需要存储每次计算的结果,并更新到存储中心供每次计算使用(如Redis、等)
).下面是一个ProcessAllWIndowFunction的示例,供参考(实现功能: WordCount 程序(增量按单词升序排序)  )







package 
com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.sort

import java.time.ZoneId

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, 
DateTimeBucketer}
import org.apache.flink.util.Collector

import scala.collection.mutable

/**
  * nc -lk 1234  输入数据
  */
object SocketWindowWordCountLocalSinkHDFSAndWindowAllAndSorted {


  def getConfiguration(isDebug:Boolean = false):Configuration={

val configuration : Configuration = new Configuration()

if(isDebug){
  val timeout = "10 s"
  val timeoutHeartbeatPause = "100 s"
  configuration.setString("akka.ask.timeout",timeout)
  configuration.setString("akka.lookup.timeout",timeout)
  configuration.setString("akka.tcp.timeout",timeout)
  configuration.setString("akka.transport.heartbeat.interval",timeout)
  
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
  configuration.setString("akka.watch.heartbeat.pause",timeout)
  configuration.setInteger("heartbeat.interval",1000)
  configuration.setInteger("heartbeat.timeout",5000)
}


configuration
  }

  def main(args: Array[String]): Unit = {


val port = 1234
// get the execution environment
   // val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment


val configuration : Configuration = getConfiguration(true)

val env:StreamExecutionEnvironment = 
StreamExecutionEnvironment.createLocalEnvironment(1,configuration)






// get input data by connecting to the socket
val dataStream = env.socketTextStream("localhost", port, '\n')



import org.apache.flink.streaming.api.scala._
val dataStreamDeal = dataStream.flatMap( w => w.split("\\s") ).map( w => 
WordWithCount(w,1))
  .keyBy("word")
  //将当前window中所有的行记录,发送过来ProcessAllWindowFunction函数中去处理(可以排序,可以对相同key进行处理)
  //缺点,window中数据量大时,就容易内存溢出
  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))

  .process(new 
ProcessAllWindowFunction[WordWithCount,WordWithCount,TimeWindow] {
override def process(context: Context, elements: 
Iterable[WordWithCount], out: Collector[WordWithCount]): Unit = {
  val set = new mutable.HashSet[WordWithCount]{}


  for(wordCount <- elements){
if(set.contains(wordCount)){
  set.remove(wordCount)
  set.add(new WordWithCount(wordCount.word,wordCount.count + 1))
}else{
  set.add(wordCount)
}
  }

  val sortSet = set.toList.sortWith( (a,b) => a.word.compareTo(b.word)  
< 0 )

  for(wordCount <- sortSet)  out.collect(wordCount)
}

  })




  //.countWindow(3)
  //.countWindow(3,1)
  //.countWindowAll(3)




//textResult.print().setParallelism(1)

val bucketingSink = new 
BucketingSink[WordWithCount]("file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data")


bucketingSink.setBucketer(new 
DateTimeBucketer[WordWithCount]("-MM-dd--HHmm", ZoneId.of("Asia/Shanghai")))
//bucketingSink.setWriter(new SequenceFileWriter[IntWritable, Text]())
//bucketingSink.setWriter(new SequenceFileWriter[WordWithCount]())
//bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
//bucketingSink.setBatchSize(100 ) // this is 400 MB,
bucketingSink.setBatchSize(1024 * 1024 * 400 ) // this is 400 MB,
//bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
bucketingSink.setBatchRolloverInterval( 2 * 1000); // this is 20 mins
//setInactiveBucketCheckInterval
//setInactiveBucketThreshold
//每间隔多久时间,往Sink中写数据,不是每天条数据就写,浪费资源

bucketingSink.setInactiveBucketThreshold(2 * 1000)
bucketingSink.setAsyncTimeout(1 * 1000)


dataStreamDeal.setParallelism(1)
  .addSink(bucketingSink)




if(args == null || args.size ==0){
  env.execute("默认作业")

  //执行计划
  //println(env.getExecutionPlan)
  //StreamGraph
 //println(env.getStreamGraph.getStreamingPlanAsJSON)



  //JsonPlanGenerator.generatePlan(jobGraph)

}else{
  env.execute(args(0))
}

println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long)

/*  abstract private   class 

Re: [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?

2019-03-04 文章

感谢大家的回答,我明白了一些了,并整理这个问题的文档

Flink1.7.2 Source、Window数据交互源码分析: 
https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/flink-source-window-data-exchange.md
 

Flink1.7.2 并行计算源码分析: 
https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md
 







Flink1.7.2 并行计算源码分析

 
源码

源码:https://github.com/opensourceteams/fink-maven-scala-2 

Flink1.7.2 Source、Window数据交互源码分析: 
https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/flink-source-window-data-exchange.md
 

 
概述

Flink Window如何进行并行计算
Flink source如何按key,hash分区,并发射到对应分区的下游Window
 
WordCount
 程序

package 
com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * nc -lk 1234  输入数据
  */
object SocketWindowWordCountLocal {



  def main(args: Array[String]): Unit = {


val port = 1234
// get the execution environment
   // val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment


val configuration : Configuration = getConfiguration(true)

val env:StreamExecutionEnvironment = 
StreamExecutionEnvironment.createLocalEnvironment(1,configuration)





// get input data by connecting to the socket
val dataStream = env.socketTextStream("localhost", port, '\n')



import org.apache.flink.streaming.api.scala._
val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => 
WordWithCount(w,1))
  .keyBy("word")
  /**
* 每20秒刷新一次,相当于重新开始计数,
* 好处,不需要一直拿所有的数据统计
* 只需要在指定时间间隔内的增量数据,减少了数据规模
*/
  .timeWindow(Time.seconds(5))
  //.countWindow(3)
  //.countWindow(3,1)
  //.countWindowAll(3)


  .sum("count" )

textResult
  .setParallelism(3)
  .print()




if(args == null || args.size ==0){


  
println("==以下为执行计划==")
  println("执行地址(firefox效果更好):https://flink.apache.org/visualizer;)
  //执行计划
  //println(env.getExecutionPlan)
 // println("==以上为执行计划 
JSON串==\n")
  //StreamGraph
 println(env.getStreamGraph.getStreamingPlanAsJSON)



  //JsonPlanGenerator.generatePlan(jobGraph)

  env.execute("默认作业")

}else{
  env.execute(args(0))
}

println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long){
//override def toString: String = Thread.currentThread().getName + word + " 
: " + count
  }


  def getConfiguration(isDebug:Boolean = false):Configuration = {

val configuration : Configuration = new Configuration()

if(isDebug){
  val timeout = "10 s"
  val timeoutHeartbeatPause = "100 s"
  configuration.setString("akka.ask.timeout",timeout)
  configuration.setString("akka.lookup.timeout",timeout)
  configuration.setString("akka.tcp.timeout",timeout)
  configuration.setString("akka.transport.heartbeat.interval",timeout)
  
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
  configuration.setString("akka.watch.heartbeat.pause",timeout)
  configuration.setInteger("heartbeat.interval",1000)
  configuration.setInteger("heartbeat.timeout",5000)
}


configuration
  }


}


 
输入数据

1 2 3 4 5 6 7 8 9 10
 
源码分析

 

Re: 订阅

2019-03-04 文章
请发送邮件到 :  user-zh-subscr...@flink.apache.org 


回收到一份回执邮件,有可能在垃圾邮件中,你再回复任意消息,就可以订阅了






> 在 2019年3月2日,下午3:55,(。・ˇ_ˇ・。:)  写道:
> 
> 订阅



Re: flink学习

2019-03-04 文章



请发送邮件到 :  user-zh-subscr...@flink.apache.org

回收到一份回执邮件,有可能在垃圾邮件中,你再回复任意消息,就可以订阅了


> 在 2019年3月2日,下午3:10,袁刚  写道:
> 
> |
> flink学习
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> |
> |
> |   |
> 
> |
> |



Re: [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?

2019-03-03 文章
--
很抱歉,我还是没有理解,我可以再次请求帮助吗?

例如:
).并行度调置为2时setParallelism(2),会产生两个window线程
). 流 WordCount local ,flink 1.7.2
).这两个Window线程是如何读取到自己分区中的数据的,Window分区是如何确定的?
).输入数据
  1 2 3 4 5 6 7 8 9 10
).source   ->  operator   -> 
--
change [partition 0]
   
   
key:1partition:0
key:2partition:0
key:3partition:0
key:4partition:0
key:6partition:0
key:10   partition:0
 --
 change 1  [partition 1]

key:5partition:1
key:7partition:1
key:8partition:1
key:9partition:1
).window 0 (1/2)
window 当前partition是如何确定的?
window 是如何读到当前parition中的数据的?
   
).window 1 (2/2) 
window 当前partition是如何确定的?
window 是如何读到当前parition中的数据的?


--





> 在 2019年3月3日,下午9:26,刘 文  写道:
> 
> WordCount.scala
> package 
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism
> 
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
> 
> /**
>  * nc -lk 1234  输入数据
>  */
> object SocketWindowWordCountLocal {
> 
> 
> 
>  def main(args: Array[String]): Unit = {
> 
> 
>val port = 1234
>// get the execution environment
>   // val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> 
> 
>val configuration : Configuration = getConfiguration(true)
> 
>val env:StreamExecutionEnvironment = 
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> 
> 
> 
> 
> 
>// get input data by connecting to the socket
>val dataStream = env.socketTextStream("localhost", port, '\n')
> 
> 
> 
>import org.apache.flink.streaming.api.scala._
>val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => 
> WordWithCount(w,1))
>  .keyBy("word")
>  /**
>* 每20秒刷新一次,相当于重新开始计数,
>* 好处,不需要一直拿所有的数据统计
>* 只需要在指定时间间隔内的增量数据,减少了数据规模
>*/
>  .timeWindow(Time.seconds(5))
>  //.countWindow(3)
>  //.countWindow(3,1)
>  //.countWindowAll(3)
> 
> 
>  .sum("count" )
> 
>textResult
>  .setParallelism(100)
>  .print()
> 
> 
> 
> 
>if(args == null || args.size ==0){
> 
> 
>  
> println("==以下为执行计划==")
>  println("执行地址(firefox效果更好):https://flink.apache.org/visualizer;)
>  //执行计划
>  println(env.getExecutionPlan)
>  println("==以上为执行计划 
> JSON串==\n")
>  //StreamGraph
> //println(env.getStreamGraph.getStreamingPlanAsJSON)
> 
> 
> 
>  //JsonPlanGenerator.generatePlan(jobGraph)
> 
>  env.execute("默认作业")
> 
>}else{
>  env.execute(args(0))
>}
> 
>println("结束")
> 
>  }
> 
> 
>  // Data type for words with count
>  case class WordWithCount(word: String, count: Long)
> 
> 
>  def getConfiguration(isDebug:Boolean = false):Configuration = {
> 
>val configuration : Configuration = new Configuration()
> 
>if(isDebug){
>  val timeout = "10 s"
>  val timeoutHeartbeatPause = "100 s"
>  configuration.setString("akka.ask.timeout",timeout)
>  configuration.setString("akka.lookup.timeout",timeout)
>  configuration.setString("akka.tcp.timeout",timeout)
>  configuration.setString("akka.transport.heartbeat.interval",timeout)
>  
> configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
>  configuration.setString("akka.watch.heartbeat.pause",timeout)
>  configuration.setInteger("heartbeat.interval",1000)
>  configuration.setInteger("heartbeat.timeout",5000)
>}
> 
> 
>configuration
>  }
> 
> 
> }
> 
> 
> 
>> 在 2019年3月3日,下午9:05,刘 文  写道:
>> 
>> 
> [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?
>> 
>> ).环境 Flink1.7.2 WordCount local,流处理
>> ).source 中 RecordWriter.emit(),给每个元素按key,分到不同的partition,已确定每个元素的分区位置,分区个数由 
>> DataStream.setParallelism(2)决定
>>  
>>  public void emit(T record) throws IOException, 
>> InterruptedException {
>> emit(record, channelSelector.selectChannels(record, 
>> numChannels));
>>  }
>>  
>>  通过copyFromSerializerToTargetChannel(int targetChannel) 
>> 往不同的通道写数据,就是往不同的分区对应的window发送数据(数据是一条一条发送)
>> ).有多少个并行度,DataStream.setParallelism(2)   ,就开启多少个Window
>>  
> 



Re: [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?

2019-03-03 文章
WordCount.scala
package 
com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * nc -lk 1234  输入数据
  */
object SocketWindowWordCountLocal {



  def main(args: Array[String]): Unit = {


val port = 1234
// get the execution environment
   // val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment


val configuration : Configuration = getConfiguration(true)

val env:StreamExecutionEnvironment = 
StreamExecutionEnvironment.createLocalEnvironment(1,configuration)





// get input data by connecting to the socket
val dataStream = env.socketTextStream("localhost", port, '\n')



import org.apache.flink.streaming.api.scala._
val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => 
WordWithCount(w,1))
  .keyBy("word")
  /**
* 每20秒刷新一次,相当于重新开始计数,
* 好处,不需要一直拿所有的数据统计
* 只需要在指定时间间隔内的增量数据,减少了数据规模
*/
  .timeWindow(Time.seconds(5))
  //.countWindow(3)
  //.countWindow(3,1)
  //.countWindowAll(3)


  .sum("count" )

textResult
  .setParallelism(100)
  .print()




if(args == null || args.size ==0){


  
println("==以下为执行计划==")
  println("执行地址(firefox效果更好):https://flink.apache.org/visualizer;)
  //执行计划
  println(env.getExecutionPlan)
  println("==以上为执行计划 
JSON串==\n")
  //StreamGraph
 //println(env.getStreamGraph.getStreamingPlanAsJSON)



  //JsonPlanGenerator.generatePlan(jobGraph)

  env.execute("默认作业")

}else{
  env.execute(args(0))
}

println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long)


  def getConfiguration(isDebug:Boolean = false):Configuration = {

val configuration : Configuration = new Configuration()

if(isDebug){
  val timeout = "10 s"
  val timeoutHeartbeatPause = "100 s"
  configuration.setString("akka.ask.timeout",timeout)
  configuration.setString("akka.lookup.timeout",timeout)
  configuration.setString("akka.tcp.timeout",timeout)
  configuration.setString("akka.transport.heartbeat.interval",timeout)
  
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
  configuration.setString("akka.watch.heartbeat.pause",timeout)
  configuration.setInteger("heartbeat.interval",1000)
  configuration.setInteger("heartbeat.timeout",5000)
}


configuration
  }


}



> 在 2019年3月3日,下午9:05,刘 文  写道:
> 
> 
 [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?
> 
> ).环境 Flink1.7.2 WordCount local,流处理
> ).source 中 RecordWriter.emit(),给每个元素按key,分到不同的partition,已确定每个元素的分区位置,分区个数由 
> DataStream.setParallelism(2)决定
>   
>   public void emit(T record) throws IOException, 
> InterruptedException {
>  emit(record, channelSelector.selectChannels(record, 
> numChannels));
>   }
>   
>   通过copyFromSerializerToTargetChannel(int targetChannel) 
> 往不同的通道写数据,就是往不同的分区对应的window发送数据(数据是一条一条发送)
> ).有多少个并行度,DataStream.setParallelism(2),就开启多少个Window
>   



[问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?

2019-03-03 文章


).环境 Flink1.7.2 WordCount local,流处理
).source 中 RecordWriter.emit(),给每个元素按key,分到不同的partition,已确定每个元素的分区位置,分区个数由 
DataStream.setParallelism(2)决定

public void emit(T record) throws IOException, 
InterruptedException {
   emit(record, channelSelector.selectChannels(record, 
numChannels));
}

通过copyFromSerializerToTargetChannel(int targetChannel) 
往不同的通道写数据,就是往不同的分区对应的window发送数据(数据是一条一条发送)
).有多少个并行度,DataStream.setParallelism(2)  ,就开启多少个Window


订阅

2019-03-01 文章
订阅





|
姓名刘文
thinktothi...@163.com
公司名称:
地址:
电话
手机:15910540132
QQ:372065525
|

扫描该二维码,可以将电子名片迅速保存到手机 使用帮助

|