回复: Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-22 Thread hdxg1101300123

为什么flatmap就是2个


发自vivo智能手机
> 不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。 
>
> yidan zhao  于2021年2月22日周一 上午10:31写道: 
>
> > 只有最后一个keyBy有效。 
> > 
> > Hongyuan Ma  于2021年2月21日周日 下午10:59写道: 
> > 
> >> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口, 
> >> 还是在前一次keyby的基础上生成m*n个窗口? 
> >> 
> >> 
> >> 像下面这样写, 最后的窗口是只按area划分的吗? 
> >> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息 
> >> stream.keyby("id") 
> >> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state 
> >> .assignTime() // 修改轨迹eventTime为预测出的时间 
> >> .keyby("area") 
> >> .window() // 根据区域划分窗口 
> >> .process() // 统计各个区域内的轨迹 
> >> 
> >> 


回复: Re: taskmanager.out配置滚动

2020-12-22 Thread hdxg1101300123
1.11可以



发自vivo智能手机
> 之前在社区我提过一次redirect的方案,但其他人有一些concerns,可以参考一下 
>
> https://github.com/apache/flink/pull/11839#pullrequestreview-399769862 
>
> zilong xiao  于2020年12月22日周二 下午4:13写道: 
>
> > 恩恩,这个场景是有的,目前看是可以通过重定向后实现,follow issue~ 
> > 
> > 李杰  于2020年12月22日周二 下午3:58写道: 
> > 
> > > Hi, 
> > > 这个功能我们之前做过,可以看下这里。 
> > > https://issues.apache.org/jira/browse/FLINK-20713 
> > > 
> > > zilong xiao  于2020年12月3日周四 下午7:50写道: 
> > > 
> > > > 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗? 
> > > > 
> > > 
> > 


转发: 两条流去重后再关联出现不符合预期数据

2020-12-15 Thread hdxg1101300123
-- 转发的邮件 --
发件人:hdxg1101300123 
日期:2020年12月15日 10:36
主题:两条流去重后再关联出现不符合预期数据
收件人:user-zh 
抄送:

> 你好: 
>     我在使用flink 
> 1.11.2版本的时候使用flinksql处理两条流。因为两条流都是数据库变更信息,我需要取最新的数据关联;所以分别对两条流做row_number=1 
> (SELECT [column_list] FROM ( 
>    SELECT [column_list],
>  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
>    ORDER BY time_attr [asc|desc]) AS rownum
>    FROM table_name) 
> WHERE rownum = 1) 
> 去重后再左关联; 
> 前期当左流变更都没有问题,结果符合预期;当右流有数据时,第一条数据也符合预期,但是右流在发送一条数据出现变更时,出现了一条不符合预期的数据; 
>     
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) 
> right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,8,160793279) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) 
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279) 
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
>  
> right> (false,3774bca649224249bdbcb8e7c80b52f9,1,0,8,160793279) 
> right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,1,1607933006000) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
>  
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,1,1607933006000)
>  
>
> 第1行左流来了数据显示true,此时右流没有数据结果是null; 
> 第2行右流来了数据,显示为true(单独打印了右流的结果); 
> 第3行显示左流撤回; 
> 第4行 左右流数据关联上,正常显示; 
> 第5行 左流数据变更,数据撤回; 
> 第6行 显示变更后的数据; 
> 第7行 右流数据变化,数据撤回; 
> 第8行 显示右流最新的结果; 
> 第9行 因为右流数据变化 所以左流(关联数据)撤回; 
> 第10行 和第11 行 不符合预期; 
> 正常应该是 右流发生变化 第9行 因为右流数据变化 所以左流(关联数据)撤回;然后右流的最新数据和左流产生结果;显示第12行数据才对; 
> 所以想请教一下大家; 
>
> 1607998361520> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> 1607998361520> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
>
> 我的sql语句如下 
> String sql = "SELECT a.sheetId 
> sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + 
>     " 
> sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId,provided,satisfied,score,operateTime
>  " + 
>     " from (SELECT 
> sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + 
>     " sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId" 
> + 
>     " FROM (SELECT *," + 
>     " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
> desc) AS rownum " + 
>     "   FROM sheetMain)" + 
>     " WHERE rownum = 1 ) a" + 
>     " left JOIN " + 
>     " (select sheetId,provided,satisfied,score,operateTime from (SELECT 
> *," + 
>     " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
> desc) AS rownum " + 
>     "   FROM sheetAnswers)" + 
>     " WHERE rownum = 1 ) c" + 
>     " ON a.sheetId = c.sheetId " ; 
>
>
>
> hdxg1101300...@163.com 


回复: Flink与Yarn的状态一致性问题

2020-11-12 Thread hdxg1101300123
可以设置检查点失败任务也失败



发自vivo智能手机
> hi everyone,
>
> 最近在使用Flink-1.11.1 On Yarn Per 
> Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn 
> application仍处于运行状态
>
> 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢
>
> best,
> amenhub