------------------------------------------------------
很抱歉,我还是没有理解,我可以再次请求帮助吗?
例如:
).并行度调置为2时setParallelism(2),会产生两个window线程
). 流 WordCount local ,flink 1.7.2
).这两个Window线程是如何读取到自己分区中的数据的,Window分区是如何确定的?
).输入数据
1 2 3 4 5 6 7 8 9 10
).source -> operator ->
------------------
change [partition 0]
key:1 partition:0
key:2 partition:0
key:3 partition:0
key:4 partition:0
key:6 partition:0
key:10 partition:0
------------------
change 1 [partition 1]
key:5 partition:1
key:7 partition:1
key:8 partition:1
key:9 partition:1
).window 0 (1/2)
window 当前partition是如何确定的?
window 是如何读到当前parition中的数据的?
).window 1 (2/2)
window 当前partition是如何确定的?
window 是如何读到当前parition中的数据的?
------------------------------------------------------
> 在 2019年3月3日,下午9:26,刘 文 <[email protected]> 写道:
>
> WordCount.scala
> package
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism
>
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
>
> /**
> * nc -lk 1234 输入数据
> */
> object SocketWindowWordCountLocal {
>
>
>
> def main(args: Array[String]): Unit = {
>
>
> val port = 1234
> // get the execution environment
> // val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>
>
> val configuration : Configuration = getConfiguration(true)
>
> val env:StreamExecutionEnvironment =
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
>
>
>
>
>
> // get input data by connecting to the socket
> val dataStream = env.socketTextStream("localhost", port, '\n')
>
>
>
> import org.apache.flink.streaming.api.scala._
> val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w =>
> WordWithCount(w,1))
> .keyBy("word")
> /**
> * 每20秒刷新一次,相当于重新开始计数,
> * 好处,不需要一直拿所有的数据统计
> * 只需要在指定时间间隔内的增量数据,减少了数据规模
> */
> .timeWindow(Time.seconds(5))
> //.countWindow(3)
> //.countWindow(3,1)
> //.countWindowAll(3)
>
>
> .sum("count" )
>
> textResult
> .setParallelism(100)
> .print()
>
>
>
>
> if(args == null || args.size ==0){
>
>
>
> println("==================================以下为执行计划==================================")
> println("执行地址(firefox效果更好):https://flink.apache.org/visualizer")
> //执行计划
> println(env.getExecutionPlan)
> println("==================================以上为执行计划
> JSON串==================================\n")
> //StreamGraph
> //println(env.getStreamGraph.getStreamingPlanAsJSON)
>
>
>
> //JsonPlanGenerator.generatePlan(jobGraph)
>
> env.execute("默认作业")
>
> }else{
> env.execute(args(0))
> }
>
> println("结束")
>
> }
>
>
> // Data type for words with count
> case class WordWithCount(word: String, count: Long)
>
>
> def getConfiguration(isDebug:Boolean = false):Configuration = {
>
> val configuration : Configuration = new Configuration()
>
> if(isDebug){
> val timeout = "100000 s"
> val timeoutHeartbeatPause = "1000000 s"
> configuration.setString("akka.ask.timeout",timeout)
> configuration.setString("akka.lookup.timeout",timeout)
> configuration.setString("akka.tcp.timeout",timeout)
> configuration.setString("akka.transport.heartbeat.interval",timeout)
>
> configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
> configuration.setString("akka.watch.heartbeat.pause",timeout)
> configuration.setInteger("heartbeat.interval",10000000)
> configuration.setInteger("heartbeat.timeout",50000000)
> }
>
>
> configuration
> }
>
>
> }
>
>
>
>> 在 2019年3月3日,下午9:05,刘 文 <[email protected]> 写道:
>>
>>
> [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?
>>
>> ).环境 Flink1.7.2 WordCount local,流处理
>> ).source 中 RecordWriter.emit(),给每个元素按key,分到不同的partition,已确定每个元素的分区位置,分区个数由
>> DataStream.setParallelism(2)决定
>>
>> public void emit(T record) throws IOException,
>> InterruptedException {
>> emit(record, channelSelector.selectChannels(record,
>> numChannels));
>> }
>>
>> 通过copyFromSerializerToTargetChannel(int targetChannel)
>> 往不同的通道写数据,就是往不同的分区对应的window发送数据(数据是一条一条发送)
>> ).有多少个并行度,DataStream.setParallelism(2) ,就开启多少个Window
>>
>