如果直接查询表是没问题,但是业务需求是按汇总后的amount排序,所以有一个from子查询,请问有没有什么方法汇总后求topN
select id,province,amount,rn from(
select
id,province,amount,
row_number() over(partition by province order by amount desc ) as rn
from (
select id,province,sum(amount) amount from mytable group by id,province
)m
)a
一张表做自关联,然后求两个粒度的占比;比如下面求月销数额占年销售额的比例,appendTable是从上游用滚动窗口计算的流注册的,tumble_end_time是某个窗口的结束时间,请问这样inner
join 会丢数据吗?有更好的方法计算占比吗?
select months,monthAmount/yearAmount as amountRate from
(select months,years,amount as monthAmount,tumble_end_time from appendTable) a
join
(select
因为sink到hbase,使用一个column存了top5的list,sink前我要组合一下这个list
> 在 2019年8月28日,10:12,Jark Wu 写道:
>
> 为什么还需要后面接 process operator 呢? Flink TopN 已经帮你维护好了 state,直接输出到一个 update sink
> 中就可以了。
>
>
> Best,
> Jark
>
>> 在 2019年8月28日,10:08,ddwcg <3149768...@qq.com> 写道:
>>
>> process
>
broker就一个,flink集群的时钟确实和broker的不一样,是不同的机房,能自己指定transactionalId吗,两个机房的调整成一样怕影响其他的应用
> 在 2019年8月29日,17:45,Wesley Peng 写道:
>
> Hi
>
> on 2019/8/29 17:13, ddwcg wrote:
>> 作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且
hi,
在写入kafka的时候自动生成了一个transactionId,请问这个id生成的方式是什么,我自己指定好像并不起作用。
作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且该程序再此启动就会报:
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with
vironment
>
> 应该是意外 import 了不同包下的同名类的缘故
>
> Best,
> tison.
>
>
> ddwcg <3149768...@qq.com> 于2019年8月26日周一 上午11:12写道:
>
>> 大家好,
>> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,
t;
>> 不应该呀,我看到仍然有
>>
>> def registerDataStream[T](name: String, dataStream: DataStream[T], fields:
>> Expression*): Unit
>>
>> 这个方法的,你能提供完整一点的上下文和报错吗?
>>
>> Best,
>> tison.
>>
>>
>> ddwcg <3149768...@qq.com <
谢谢您的耐心解答,是本地cache的问题,已经解决
> 在 2019年8月26日,17:56,ddwcg <3149768...@qq.com> 写道:
>
> 都加了,还是不行,下面是我的pom文件和 libraires的截图
>
>
>
> apache.snapshots
> Apache Development Snapshot Repository
> https://repository.apache.org/content/
文档上还没有更新topN怎么使用,我尝试用row_number() over() 跑了一下,但是报错,请问topN可以是RetractStream吗?
val monthstats = bsTableEnv.sqlQuery(
"""
|select
|id,province,amount,
|row_number() over(partition by id,province order by amount ) as rn
|from mytable where type=1
|group by
/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>
>
>
> Best,
> Jark
>
>> 在 2019年8月26日,14:56,ddwcg <3149768...@qq.com> 写道:
>>
>> 大家好,
>> 升级到1.9后有几个问题:
>> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011
>>
hi,
请问在作业启动时设置的并行度,和后面算子的并行度是什么关系?
比如启动时设置为1,map算子设置为5,map(...).setParallelism(5),这个算子并行度的设置会起作用吗(因为它大于全局设置的1)?
启动时设置的并行数是slot的个数,每个slot的资源是固定的(比如是1G内存),那么后面的算子增加了并行度,资源怎么划分?
另外flink sql的并行度是怎么设置的,我没有发现有setParallelism,只有一个最大并行度的设置:setMaxParallelism()
谢谢
谢谢您的回复,那如果启动的时候只给了一个solt,算子并行度设置为2,最终也是按并行度为1去执行呢
> 在 2019年8月29日,10:54,pengcheng...@bonc.com.cn 写道:
>
> 你好,以我的理解,并行度的优先级setParallelism>命令>配置文件。
> 每个算子有多个并行度的话,每个并行度占一个slot。
> flink sql无法设置并行度。
>
>
>
> pengcheng...@bonc.com.cn
>
> 发件人: ddwcg
> 发送时间: 2
12 matches
Mail list logo