答复: 关于elasticSearch table sink 构造过于复杂

2019-08-26 文章 aven . wu
你好: 可以自己构建 indexRequest 设置id,type,source 等字段 ElasticsearchSinkFunction 不知道是否满足你的需求? 发件人: Jark Wu 发送时间: 2019年8月26日 18:00 主题: Re: 关于elasticSearch table sink 构造过于复杂 > ETL作业, 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的. 据我所知,目前是不支持的。 可以去建个 JIRA 给社区提需求。 如果使用的 blink planner,可以使用 deduplicate

Re: 关于flink状态后端使用Rocksdb序列化问题

2019-08-26 文章 Jark Wu
I think Congxian is right. POJO Schema Evolution is the feature what you want. Best, Jark > 在 2019年8月26日,21:52,Congxian Qiu 写道: > > hi, 你看以看一下 1.8 开始支持的 POJO Scheme Evolution[1] 是否满足你的需求,你需要注意的是如何满足 Flink > 中判断 POJO 的定义 [2] > [1] >

flink1.9中关于blink的文档在哪看呀

2019-08-26 文章 rockey...@163.com
hi,all flink1.9中关于blink的文档在哪看呀?找了半天找不到 0.0 rockey...@163.com Have a good day !

????flink??kafka??????????????????

2019-08-26 文章 1900
flink on yarn?? flink??1.7.2??hadoop??2.8.5??kafka??1.0.0 kafkaflinkkafka??offset ?? Properties props = new Properties(); props.put("auto.offset.reset", "latest"); Schema(), props));DataStream data = env.addSource(new

Re: flink1.9中关于blink的文档在哪看呀

2019-08-26 文章 Zili Chen
Blink 的文档应该都在 [1] 了,并没有跟着 Flink 版本变化而变化的意思呀(x Best, tison. [1] https://github.com/apache/flink/blob/blink/README.md rockey...@163.com 于2019年8月27日周二 上午10:18写道: > > hi,all > flink1.9中关于blink的文档在哪看呀?找了半天找不到 0.0 > > > rockey...@163.com > Have a good day ! >

Re: 关于row number over的用法

2019-08-26 文章 Jark Wu
Hi 你的 query 并不是 topn 语法。 可以先看这篇文档了解 topn 语法: http://blink.flink-china.org/dev/table/sql.html#topn Best, Jark > 在 2019年8月26日,19:02,ddwcg <3149768...@qq.com> 写道: > > > 文档上还没有更新topN怎么使用,我尝试用row_number() over()

Re: flink1.9中关于blink的文档在哪看呀

2019-08-26 文章 Jark Wu
Blink 合并到 flink 后,是作为一种 planner 的实现存在,所以文档是和 flink 在一起的。 如何使用 blink planner,可以看这里:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#create-a-tableenvironment

Re: FLINK TABLE API 自定义聚合函数UDAF从check point恢复任务报状态序列化不兼容问题( For heap backends, the new state serializer must not be incompatible)

2019-08-26 文章 orlando qi
没有改变,我主要是来测试恢复任务是不是成功。 import java.lang.{Double => JDouble, Long => JLong, String => JString} import com.vrv.bigdata.scala.datetime.DateTimeUtil import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} import org.apache.flink.table.functions.AggregateFunction /** * 自定义聚合函数:更新列值

实时计算占比的问题

2019-08-26 文章 ddwcg
一张表做自关联,然后求两个粒度的占比;比如下面求月销数额占年销售额的比例,appendTable是从上游用滚动窗口计算的流注册的,tumble_end_time是某个窗口的结束时间,请问这样inner join 会丢数据吗?有更好的方法计算占比吗? select months,monthAmount/yearAmount as amountRate from (select months,years,amount as monthAmount,tumble_end_time from appendTable) a join (select

Re: 关于elasticSearch table sink 构造过于复杂

2019-08-26 文章 Jark Wu
> ETL作业, 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的. 据我所知,目前是不支持的。 可以去建个 JIRA 给社区提需求。 如果使用的 blink planner,可以使用 deduplicate with keeping first row,是一个比较轻量的去重计算,能拿到一个 key (也就是去重 key)。 文档还在 review 中,可以先看这个PR:

Re: flink 1.9 消费kafka报错

2019-08-26 文章 Jark Wu
看起来是你依赖了一个老版本的 EnvironmentSettings,可能是本地 mvn cache 导致的。 可以尝试清空下 “~/.m2/repository/org/apache/flink/flink-table-api-java” 目录。 Best, Jark > 在 2019年8月26日,17:56,ddwcg <3149768...@qq.com> 写道: > > 都加了,还是不行,下面是我的pom文件和 libraires的截图 > > > > apache.snapshots > Apache Development

Re: flink 1.9 消费kafka报错

2019-08-26 文章 ddwcg
谢谢您的耐心解答,是本地cache的问题,已经解决 > 在 2019年8月26日,17:56,ddwcg <3149768...@qq.com> 写道: > > 都加了,还是不行,下面是我的pom文件和 libraires的截图 > > > > apache.snapshots > Apache Development Snapshot Repository > https://repository.apache.org/content/repositories/snapshots/ >

关于row number over的用法

2019-08-26 文章 ddwcg
文档上还没有更新topN怎么使用,我尝试用row_number() over() 跑了一下,但是报错,请问topN可以是RetractStream吗? val monthstats = bsTableEnv.sqlQuery( """ |select |id,province,amount, |row_number() over(partition by id,province order by amount ) as rn |from mytable where type=1 |group by

Re:Re:回复: Re: flink1.9 blink planner table ddl 使用问题

2019-08-26 文章 hb
感谢,解决了, 指定 'connector.version' = '0.11' 就可以了. Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题. 在 2019-08-26 14:26:15,"hb" <343122...@163.com> 写道: >kafka版本是 kafka_2.11-1.1.0, >支持的kafka版本有哪些 >在 2019-08-26 14:23:19,"pengcheng...@bonc.com.cn" 写道: >>检查一下代码的kafka版本,可能是这方面的错误 >> >> >> >>pengcheng...@bonc.com.cn >>

Elasticsearch6UpsertTableSink 的构造方法过于复杂

2019-08-26 文章 巫旭阳
public Elasticsearch6UpsertTableSink( boolean isAppendOnly, TableSchema schema, List hosts, String index, String docType, String keyDelimiter, String keyNullLiteral, SerializationSchema serializationSchema, XContentType contentType,

Re:Re: 关于elasticSearch table sink 构造过于复杂

2019-08-26 文章 hb
没有group by的语句,比如就是select * from table ,表明细数据,以DDL 方式 写入 es, 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的. 在 2019-08-26 15:47:53,"Jark Wu" 写道: >嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。 > >Best, >Jark > > > >> 在 2019年8月26日,16:44,巫旭阳 写道: >> >> 感谢解答, >> 我的意图是 构建EStablesink,可以将table

Re:Re: 关于elasticSearch table sink 构造过于复杂

2019-08-26 文章 巫旭阳
感谢解答, 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入 在 2019-08-26 16:39:49,"Jark Wu" 写道: >Hi , > > >Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。 >如果要注册一个 ES sink,可以使用 descriptor API,也就是 >org.apache.flink.table.descriptors.Elasticsearch。 >或者使用 DDL

Re: 关于elasticSearch table sink 构造过于复杂

2019-08-26 文章 Jark Wu
嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。 Best, Jark > 在 2019年8月26日,16:44,巫旭阳 写道: > > 感谢解答, > 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入 > > > > > > > 在 2019-08-26 16:39:49,"Jark Wu" 写道: >> Hi , >> >> >> Elasticsearch6UpsertTableSink 是标记成

Re: flink 1.9 消费kafka报错

2019-08-26 文章 ddwcg
hi,我指定了使用blinkplanner,还是报一样的错 object StreamingJob { def main(args: Array[String]) { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv =

Re: 关于flink状态后端使用Rocksdb序列化问题

2019-08-26 文章 Congxian Qiu
hi, 你看以看一下 1.8 开始支持的 POJO Scheme Evolution[1] 是否满足你的需求,你需要注意的是如何满足 Flink 中判断 POJO 的定义 [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html [2]