@thinktothings

不知道是否我理解正确,我觉得你可以对flink的一些计算流程不是很清楚。 SumAggregator内的reduce
方法就可以计算出需要的“sum”结果。

你一直问什么时候调用sum,是指代码中的“sum("count")”吗?这个在构建steamgraph的时候已经调用了,目的就是获取内部返回的SumAggregator对象。

有个文章可以了解下。看下第一次即可:https://www.jianshu.com/p/13070729289c

如果我理解的不对,请忽略

On Thu, Feb 28, 2019 at 2:57 PM <[email protected]> wrote:

> --------------------------------------
>
> ).本地环境: scala WordCount ,程序在附件中 SocketWindowWordCountLocal.scala
> ).输入数据:
>    a b a
> ).设置的 timeWindow(Time.seconds(20))
> ).[问题]想调试Flink源码中具体在哪一步进行sum操作
> -------------------------------------------------
>
> 调试:
> ).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值,
> 函数中会一条一条数据发送(a,1),(b,1),(a,1)
> ).调用 StreamSink.processElement 函数打印输出结果
> ).没明白地方,是在调用StreamSink.processElement之前,在哪个地方调用了sum,对相同key进行了聚合操作
>
>
> On Thursday, 28 February 2019, 2:47:39 pm GMT+8, Yaoting Gong <
> [email protected]> wrote:
>
>
> 你好。
> 我不知道你的是什么项目的代码。我从flink 官方的样例代码 SocketWindowWordCount.scala找到。
> 从sum跟进去,最终能找到一个SumAggregator。
>
> On Thu, Feb 28, 2019 at 2:34 PM <[email protected]> wrote:
>
> >  Flink wordCount
> >
> 本地程序,代码如下,想调下代码,没找到Window,的时间Trigger结束时,在哪个地方进行的sum,统计结果,我想关注这个点的问题,请问能详细的说明下吗?-------------------------------------package
> > com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc
> >
> > import org.apache.flink.configuration.Configuration
> > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> > import org.apache.flink.streaming.api.windowing.time.Time
> >
> > /**
> >  * nc -lk 1234  输入数据
> >  */
> > object SocketWindowWordCountLocal {
> >
> >  def main(args: Array[String]): Unit = {
> >
> >
> >    val port = 1234
> >    // get the execution environment
> >    // val env: StreamExecutionEnvironment =
> > StreamExecutionEnvironment.getExecutionEnvironment
> >
> >
> >    val configuration : Configuration = new Configuration()
> >    val timeout = "100000 s"
> >    val timeoutHeartbeatPause = "1000000 s"
> >    configuration.setString("akka.ask.timeout",timeout)
> >    configuration.setString("akka.lookup.timeout",timeout)
> >    configuration.setString("akka.tcp.timeout",timeout)
> >    configuration.setString("akka.transport.heartbeat.interval",timeout)
> >
> >
> configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
> >    configuration.setString("akka.watch.heartbeat.pause",timeout)
> >    configuration.setInteger("heartbeat.interval",10000000)
> >    configuration.setInteger("heartbeat.timeout",50000000)
> >    val env:StreamExecutionEnvironment =
> > StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> >
> >
> >
> >
> >
> >    // get input data by connecting to the socket
> >    val dataStream = env.socketTextStream("localhost", port, '\n')
> >
> >
> >
> >    import org.apache.flink.streaming.api.scala._
> >    val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w =>
> > WordWithCount(w,1))
> >      .keyBy("word")
> >      /**
> >        * 每20秒刷新一次,相当于重新开始计数,
> >        * 好处,不需要一直拿所有的数据统计
> >        * 只需要在指定时间间隔内的增量数据,减少了数据规模
> >        */
> >      .timeWindow(Time.seconds(20))
> >      //.countWindow(3)
> >      //.countWindow(3,1)
> >      //.countWindowAll(3)
> >
> >
> >      .sum("count" )
> >
> >    textResult.print().setParallelism(1)
> >
> >
> >
> >    if(args == null || args.size ==0){
> >      env.execute("默认作业")
> >
> >      //执行计划
> >      //println(env.getExecutionPlan)
> >      //StreamGraph
> >      //println(env.getStreamGraph.getStreamingPlanAsJSON)
> >
> >
> >
> >      //JsonPlanGenerator.generatePlan(jobGraph)
> >
> >    }else{
> >      env.execute(args(0))
> >    }
> >
> >    println("结束")
> >
> >  }
> >
> >
> >  // Data type for words with count
> >  case class WordWithCount(word: String, count: Long)
> >
> > }
> >
> >
> >    On Thursday, 28 February 2019, 2:08:00 pm GMT+8, Yaoting Gong <
> > [email protected]> wrote:
> >
> >  @Yuan Yifan
> >
> > *不能贴图的。*
> >
> > On Thu, Feb 28, 2019 at 2:03 PM Yuan Yifan <[email protected]> wrote:
> >
> > >
> > > 你说的应该是这里的代码:
> > >
> > >
> > >
> >
> http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code
> > >
> > > 其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。
> > >
> > > 本质上,sum是执行了一个Sum类型的Aggregate:
> > > 其AggregateFunction是:
> > >
> > >
> >
> org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int,
> > > org.apache.flink.api.common.typeinfo.TypeInformation<T>,
> > > org.apache.flink.api.common.ExecutionConfig)
> > >
> > >
> > > 其中实现了reduce方法:
> > >
> > > 所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。
> > >
> > >
> > >
> > > 在 2019-02-28 13:04:59," " <[email protected]> 写道:
> > > >请问:  flink wordcount中      sum是在什么时候,哪个地方调用的?
> > >
> > >
> > >
> > >
> > >
> >
>

回复