在子查询上使用row_number over返回的rn都是1

2019-08-27 文章 ddwcg
如果直接查询表是没问题,但是业务需求是按汇总后的amount排序,所以有一个from子查询,请问有没有什么方法汇总后求topN select id,province,amount,rn from( select id,province,amount, row_number() over(partition by province order by amount desc ) as rn from ( select id,province,sum(amount) amount from mytable group by id,province )m )a

实时计算占比的问题

2019-08-26 文章 ddwcg
一张表做自关联,然后求两个粒度的占比;比如下面求月销数额占年销售额的比例,appendTable是从上游用滚动窗口计算的流注册的,tumble_end_time是某个窗口的结束时间,请问这样inner join 会丢数据吗?有更好的方法计算占比吗? select months,monthAmount/yearAmount as amountRate from (select months,years,amount as monthAmount,tumble_end_time from appendTable) a join (select

Re: 在子查询上使用row_number over返回的rn都是1

2019-08-27 文章 ddwcg
因为sink到hbase,使用一个column存了top5的list,sink前我要组合一下这个list > 在 2019年8月28日,10:12,Jark Wu 写道: > > 为什么还需要后面接 process operator 呢? Flink TopN 已经帮你维护好了 state,直接输出到一个 update sink > 中就可以了。 > > > Best, > Jark > >> 在 2019年8月28日,10:08,ddwcg <3149768...@qq.com> 写道: >> >> process >

Re: 关于flink 写于kafka时的transactionId 生成问题

2019-08-29 文章 ddwcg
broker就一个,flink集群的时钟确实和broker的不一样,是不同的机房,能自己指定transactionalId吗,两个机房的调整成一样怕影响其他的应用 > 在 2019年8月29日,17:45,Wesley Peng 写道: > > Hi > > on 2019/8/29 17:13, ddwcg wrote: >> 作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且

关于flink 写于kafka时的transactionId 生成问题

2019-08-29 文章 ddwcg
hi, 在写入kafka的时候自动生成了一个transactionId,请问这个id生成的方式是什么,我自己指定好像并不起作用。 作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且该程序再此启动就会报: Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with

Re: flink 1.9.0 StreamTableEnvironment 编译错误

2019-08-25 文章 ddwcg
vironment > > 应该是意外 import 了不同包下的同名类的缘故 > > Best, > tison. > > > ddwcg <3149768...@qq.com> 于2019年8月26日周一 上午11:12写道: > >> 大家好, >> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,

Re: flink 1.9.0 StreamTableEnvironment 编译错误

2019-08-25 文章 ddwcg
t; >> 不应该呀,我看到仍然有 >> >> def registerDataStream[T](name: String, dataStream: DataStream[T], fields: >> Expression*): Unit >> >> 这个方法的,你能提供完整一点的上下文和报错吗? >> >> Best, >> tison. >> >> >> ddwcg <3149768...@qq.com <

Re: flink 1.9 消费kafka报错

2019-08-26 文章 ddwcg
谢谢您的耐心解答,是本地cache的问题,已经解决 > 在 2019年8月26日,17:56,ddwcg <3149768...@qq.com> 写道: > > 都加了,还是不行,下面是我的pom文件和 libraires的截图 > > > > apache.snapshots > Apache Development Snapshot Repository > https://repository.apache.org/content/

关于row number over的用法

2019-08-26 文章 ddwcg
文档上还没有更新topN怎么使用,我尝试用row_number() over() 跑了一下,但是报错,请问topN可以是RetractStream吗? val monthstats = bsTableEnv.sqlQuery( """ |select |id,province,amount, |row_number() over(partition by id,province order by amount ) as rn |from mytable where type=1 |group by

Re: flink 1.9 消费kafka报错

2019-08-26 文章 ddwcg
/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment> > > > Best, > Jark > >> 在 2019年8月26日,14:56,ddwcg <3149768...@qq.com> 写道: >> >> 大家好, >> 升级到1.9后有几个问题: >> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011 >>

全局并行度和算子并行度的关系

2019-08-28 文章 ddwcg
hi, 请问在作业启动时设置的并行度,和后面算子的并行度是什么关系? 比如启动时设置为1,map算子设置为5,map(...).setParallelism(5),这个算子并行度的设置会起作用吗(因为它大于全局设置的1)? 启动时设置的并行数是slot的个数,每个slot的资源是固定的(比如是1G内存),那么后面的算子增加了并行度,资源怎么划分? 另外flink sql的并行度是怎么设置的,我没有发现有setParallelism,只有一个最大并行度的设置:setMaxParallelism() 谢谢

Re: 全局并行度和算子并行度的关系

2019-08-29 文章 ddwcg
谢谢您的回复,那如果启动的时候只给了一个solt,算子并行度设置为2,最终也是按并行度为1去执行呢 > 在 2019年8月29日,10:54,pengcheng...@bonc.com.cn 写道: > > 你好,以我的理解,并行度的优先级setParallelism>命令>配置文件。 > 每个算子有多个并行度的话,每个并行度占一个slot。 > flink sql无法设置并行度。 > > > > pengcheng...@bonc.com.cn > > 发件人: ddwcg > 发送时间: 2