Re: 关于状态TTL

2020-04-17 文章 Benchao Li
而且https://issues.apache.org/jira/browse/FLINK-15938 和 https://issues.apache.org/jira/browse/FLINK-16581 这两个issue现在已经都merge了,你也可以cherry-pick过去。 Benchao Li 于2020年4月17日周五 下午2:54写道: > 嗯,blink planner跟legacy planner是有一些实现上的差异。 > 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordState

Re: 关于状态TTL

2020-04-17 文章 Benchao Li
lse { return StateTtlConfig.DISABLED; } } 酷酷的浑蛋 于2020年4月17日周五 下午2:47写道: > 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期 > > > > > 在2020年4月17日 14:16,Benchao Li 写道: > 这是两个问题, > > - 状态只访问一次,可能不会清理。 > > > 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanup

Re: 关于状态TTL

2020-04-17 文章 Benchao Li
访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了 > > > > > 在2020年4月17日 13:07,Benchao Li 写道: > 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`, > 所以这个问题现在是不能完全避免了。 > 我已经建了一个jira[1]来跟踪和改进这一点。 > > [1] https://issues.apache.org/jira/browse/FLINK-17199 &

Re: 关于状态TTL

2020-04-16 文章 Benchao Li
: > 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个 > > > > > 在2020年4月15日 18:04,Benchao Li 写道: > Hi, > > 你用的是哪个版本呢? > 在1.9版本里面的确是有点问题,默认没有开启cleanup in background > [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 > > [1] https://issues.apache.org/jira/browse/FLINK-15938 &

Re: 请问Flink-1.10.1 release可以在哪里下载?(无正文)

2020-04-16 文章 Benchao Li
Hi, Flikn 1.10.1还没有正式发布,暂时还没有地方可以直接下载。可以从源码直接编译一下~ samuel@ubtrobot.com 于2020年4月16日周四 下午3:04写道: > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
1.10.1最近正在准备发布,还有几个blocker的issue,应该快了。 1.11的话,应该还比较久,现在都还没有feature freeze。 如果你可以在master上复现这个问题的话,可以建一个issue。 111 于2020年4月16日周四 上午11:32写道: > Hi, > 是的,我都有修改. > 那我去jira里面重新开个issue? > > > 另外,1.10.1或者1.11大概什么时间发布呢?我已经合并了很多PR,现在的版本有点乱了。 > Best, > Xinghalo -- Benchao

Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
LogicalTableScan(table=[[tgou, collie, kafka_test1, source: > [Kafka011TableSource(id, a, b)]]]) > LogicalFilter(condition=[=($cor1.id, $0)]) > LogicalSnapshot(period=[$cor1.ts]) > LogicalTableScan(table=[[tgou, collie, hbase_test1, source: > [HBaseTableSource[schema=[rowkey, f], projectFields=null) > > > Best, > Xinghalo -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
ache.org/jira/browse/FLINK-16068 > Best, > Xinghalo > > > 在2020年04月15日 21:21,Benchao Li 写道: > 这个原因是维表join的时候需要使用的时间是*有处理时间属性*[1] 的。 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html > > 111 于2020年4月15日周三 下午9:08写

Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 Benchao Li
万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同 > 。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 > 再对cStream进行keyBy-->timeWindow-->sum. > >> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。 > >> 用dataStream.from

Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
; > > 所以现在只能使用connector内部提供的Lookup function手动注册成 table function,使用lateral table > xxx的语法来使用。 > > > Best, > Xinghalo > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
10-sql支持维表DDL吗,看社区文档好像mysql和hbase支持,但是需要什么字段显示声明为创建的表是维表呀? > > > > guaishushu1...@163.com > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: 关于状态TTL

2020-04-15 文章 Benchao Li
gt; sql: select * from test t join test2 t2 on t.a=t2.a > > 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:

Re: flink-sql-connector-elasticsearch7_2.11-1.10.0.jar

2020-04-15 文章 Benchao Li
.script.mustache.SearchTemplateRequest > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ...

Re: flinksql如何控制结果输出的频率

2020-04-15 文章 Benchao Li
非常开心能够帮助到你~ 刘建刚 于2020年4月15日周三 下午3:57写道: > 感谢 Benchao,问题应解决了! > > 2020年4月15日 下午3:38,Benchao Li 写道: > > Hi 建刚, > > 现在Emit的原理是这样子的: > - *当某个key*下面来了第一条数据的时候,注册一个emit delay之后的*处理时间定时器*; > - 当定时器到了的时候, > - 检查当前的key下的聚合结果跟上次输出的结果是否有变化, > - 如果有变化,就发送-[old], +[n

Re: flinksql如何控制结果输出的频率

2020-04-15 文章 Benchao Li
Env.toRetractStream(result, Row.class).print(); > > env.execute("IncrementalGrouping"); > } > > private static final class SourceData implements > SourceFunction> { > @Override > public void run(SourceContext> ctx) throws > Exception { > while (true) { >

Re: flink array 查询解析问题

2020-04-14 文章 Benchao Li
rows > * @return > */ > public String eval(Row [] rows){ > return JSONObject.toJSONString(rows); > } > > } -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: ddl array 创建问题求助

2020-04-09 文章 Benchao Li
me_line": {type:"object", > "properties": > {"rule_name": {type: "string"}}, > "count": {"type": "string"' > ); > > > > ERROR: > table field 'event_time_line' does not match with the physical type > ROW<`rule_name` STRING of the 'event_time_line' field of the > TableSource return type. -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: flink SQL 的OverWindow

2020-04-09 文章 Benchao Li
Hi 宇轩, 你指的是streaming sql么? 如果是的话,over window是可以支持事件时间和处理时间两种时间的。 可以基于rows也可以基于range,可以是bounded,也可以是unbounded。 韩宇轩 于2020年4月9日周四 下午6:10写道: > 请问Flink SQL中的OverWindow是基于什么时间窗口实现的 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Em

Re: Flink SQL 1.10中ROW_NUMBER的使用

2020-04-06 文章 Benchao Li
able.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > 我看官网的TOPN例子中,order by 后面也可以是long型。难道row_number不能作用在窗口结果上? &

Re: 第一次统计信息延迟

2020-04-06 文章 Benchao Li
tificate_no)) OVER(PARTITION BY > terminal_only_no ORDER BY event_time RANGE BETWEEN INTERVAL '10' MINUTE > preceding AND CURRENT ROW) as login_frequency from TradeFlow > > 请大佬们帮忙看下,谢谢 > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University

Re: 回复: 回复: Flink双流Join问题

2020-04-04 文章 Benchao Li
> public String > nbsp;nbsp;nbsp; > amp;nbsp;amp;nbsp;amp;nbsp; amp;amp;gt; > join(Tuple3 Tuple3 nbsp;nbsp;nbsp; > amp;nbsp;amp;nbsp;amp;nbsp; amp;amp;gt; > String,Stringamp;amp;amp;amp;gt; value2) throws Exception { > nbsp;nbsp;nbsp; > amp;nbsp;amp;nbsp;amp;nbsp; a

Re: 回复: 回复: Flink双流Join问题

2020-04-04 文章 Benchao Li
p;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp; > public String > nbsp;nbsp;nbsp; amp;gt; > join(Tuple3 nbsp;nbsp;nbsp; amp;gt; > String,Stringamp;amp;amp;gt; value2) throws Exception { > nbsp;nbsp;nbsp; amp;gt; > amp;amp;nbsp;amp;amp;nb

Re: Flink双流Join问题

2020-04-04 文章 Benchao Li
} > }) > .window(TumblingEventTimeWindows.of(Time.seconds(3))) > .apply(new JoinFunction String,Stringgt;, Tuple3 Stringgt;() { > @Override > public String > join(Tuple3 String,Stringgt; value2) throws Exception { > return > value1.f1 + "=" + value2.f1; > } > }).print(); > 结果如下: > currentTimeStamp: > 100055000,Key:1,EventTime:100055000,前一条数据的水位线:0 > 4gt; (1,tom1,100055000) > currentTimeStamp: > 100056000,Key:1,EventTime:100056000,前一条数据的水位线:100055000 > 4gt; (1,tom2,100056000) > currentTimeStamp: > 100055000,Key:1,EventTime:100055000,前一条数据的水位线:0 > 3gt; (1,jerry1,100055000) > currentTimeStamp: > 100056000,Key:1,EventTime:100056000,前一条数据的水位线:100055000 > 3gt; (1,jerry2,100056000) > 2gt; tom1=jerry1 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: 1585972779129_EWCE6K_[]R%C3V)5C{NJE)K

2020-04-04 文章 Benchao Li
=window_endtime.那么,这里应该是100057000 > 这条数据来了后才会触发,但是结果却是56000就触发了.为什么? > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: Flink双流Join问题

2020-04-04 文章 Benchao Li
水印的触发条件应该是watermark_time>=window_endtime.那么,这里应该是100057000这条数据来了后才会触发,但是结果却是56000就触发了.为什么? > > > 结果: > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: flink sql 实现双流join 的滑动窗口

2020-04-02 文章 Benchao Li
n_requests_detail_view a > LEFT JOIN t1_oa_flow_inst_task_view_segment_no_5 f ON a.id = > f.business_id > LEFT JOIN t1_oa_flow_inst_task_view_segment_no_61 g ON a.id = > g.business_id > GROUP BY TUMBLE(a.ts, INTERVAL '10' MINUTE); > > 报错: > -- Benchao Li School of Electro

Re: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

2020-04-01 文章 Benchao Li
.getSelectFromStatement( options.getTableName(), > returnType.getFieldNames(), new String[0]); > 构建sql的时候 conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。 > 后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。 > 在2020年04月2日 10:11,Benchao Li 写道: > Hi, > > 能否把你的SQL也发出来呢? > > 正常来讲,维表关联用的是join的等值条件作为关

Re: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

2020-04-01 文章 Benchao Li
api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。 > 不知道有没有什么优雅的解决方案? > > > Best, > Xinghalo -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: Flink SQL中动态嵌套字段如何定义DDL

2020-03-31 文章 Benchao Li
可以尝试把data字段定义为一个map类型。 111 于2020年3月31日周二 下午2:56写道: > Hi, > 我们在使用streamsets作为CDC工具,输出到kafka中的内容是嵌套多变的类型,如: > {database:a, table: b, type:update, data:{a:1,b:2,c:3}} > {database:a, table: c, type:update, data:{c:1,d:2}} > 请问这种类型该如何定义DDL? > > > Best, > Xinghalo

Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 文章 Benchao Li
xchunj...@163.com > Send Time: 2020-03-29 10:32 > Receiver: user-zh@flink.apache.org > Subject: RE: 实现 KafkaUpsertTableSink > Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 > -Original Message- > From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org > On Behalf

Re: Re: flinksql如何控制结果输出的频率

2020-03-28 文章 Benchao Li
出从0点到2点的总交易额)我想获得如下结果: > 2020-03-28T01:00 100 > 2020-03-28T02:00 280 > > 2020-03-28T23:00 18000 > 2020-03-28T00:00 19520 > 2020-03-29T01:00 120 > 2020-03-29T01:00 230 > 我应该获得是一个不断append的数据流,而不是retract数据流。 > 并且设置提前发射的事件,flink应该是选取的处理时间而不是事件时间? > > > > > >

Re: 实现 KafkaUpsertTableSink

2020-03-28 文章 Benchao Li
ad service provider for table > factories.", e); >} > > } > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > 以成功运行的。 > > 非常感谢 > > > > > > -- > > Thanks > > venn > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: flinksql如何控制结果输出的频率

2020-03-27 文章 Benchao Li
Jark, 这个功能我们用的还挺多的~ 现在还有个痛点是window operator不支持retract输入,所以用了emit就没有办法做到窗口级联使用了。 Jark Wu 于2020年3月27日周五 下午8:01写道: > Benchao 可以啊。这么隐秘的实验性功能都被你发现了 :D > > On Fri, 27 Mar 2020 at 15:24, Benchao Li wrote: > > > Hi, > > > > 对于第二个场景,可以尝试一下fast emit: > > table

Re: flinksql如何控制结果输出的频率

2020-03-27 文章 Benchao Li
; - 1分钟窗口 > - 5分钟窗口,计算只是保存数据,发送明细数据结果 > > Best, > Jingsong Lee > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: flink sql 去重算法

2020-03-19 文章 Benchao Li
过java的set容器去重的呢? -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: Re: sql关键字问题

2020-03-18 文章 Benchao Li
查询语句,发现即使用了`` 查询的时候还是会报Table是关键字的问题。 > SQL parse failed. Encountered "Table" at line 1,column 19. 但是一旦我把 > `event_ts` as > to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), > WATERMARK FOR event_ts AS event_ts - interval '60’ second 这两行去掉 > ,就

Re: FlinkSQL 1.10 DDL无法指定水印

2020-03-11 文章 Benchao Li
> identifier 'date' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) > ... 63 more > flink无法识别关键字date/time?? > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: [SURVEY] 您在使用什么数据变更同步工具(CDC)?

2020-03-11 文章 Benchao Li
在对接一些 CDC (Change Data Capture) 工具,以期在下个版本能支持读取和处理常见的 binlog > 数据,所以需要调研下大家目前主要使用的 CDC 工具是什么。 > > 欢迎大家填下问卷调查,您的反馈对我们非常重要,谢谢! > > http://apacheflink.mikecrm.com/wDivVQ1 > > 也欢迎大家在这个邮件下讨论关于 Flink 对接 CDC 的一些想法、需求、期望。 > > Best, > Jark > -- Benchao Li School of El

Re: Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 文章 Benchao Li
ink maybe the problem is related with maven jar > classpath. But not sure about that. > > If you can submit the job by a shade jar through cluster , could you share > the project pom settings and sample test code ? > > > > > At 2020-03-02 20:36:06, "Benchao Li" wr

Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 文章 Benchao Li
g_tot_aset_y365 *double *, > avg_aset_create_y > *double*) *WITH *( > *'connector.type' *= *'jdbc'*, > *'connector.url' *= *''*, > *'connector.table' *= *'app_cust_serv_rel_info'*, > *'connector.driver' *= *'com.mysql.jdbc.Driver'*, > *'connector.username' *= *'admin'*, &g

Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 文章 Benchao Li
me_product_sell_007118'*, > *'connector.document-type' *= *'_doc'*, > *'update-mode' *= *'upsert'*, > *'connector.key-delimiter' *= *'$'*, > *'connector.key-null-literal' *= *'n/a'*, > *'connector.bulk-flush.interval' *= *'1000'*, > *'format.type' *= > *'json'*) > > > >

Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 文章 Benchao Li
quot;-MM-dd"*; > SimpleDateFormat sf = *new *SimpleDateFormat(format); > *return *sf.format(date); > } *else *{ > String format = *"-MM-dd**\'**T**\'**HH:mm:00.000+0800"*; > SimpleDateFormat sf = *new *SimpleDateFormat(format); &g

Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 文章 Benchao Li
ting and use udf, the job can submit > successfully too. > > I dive a lot with this exception. Maybe it is related with some > classloader issue. Hope for your suggestion. > > > > > > 在 2020-03-01 17:54:03,"Benchao Li" 写道: > > Hi fulin, > > It seems li

Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 文章 Benchao Li
er.java:411) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > at org.codehaus.

Re: How does Flink manage the kafka offset

2020-02-20 文章 Benchao Li
t; .withKeyDeserializer(KeyDeserializer.class) > .withValueDeserializerAndCoder(getDeserializer(encoding), new > JsonNodeCoder<>()) > .withConsumerConfigUpdates(consumerConfig) > .withoutMetadata(); > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 文章 Benchao Li
Dian Fu and welcome on board! >> >> >> >> Best, >> >> Jincheng >> >> >> >> Shuo Cheng 于2020年1月16日周四 下午6:22写道: >> >> >> >>> Congratulations! Dian Fu >> >>> >> >>> > Xingbo Wei Zho

Re: Flink Sql Join, how to clear the sql join state?

2020-01-15 文章 Benchao Li
er join in my code,I saw the flink > document, the flink sql inner join will keep both sides of the join input > in Flink’s state forever. > As result , the hdfs files size are so big , is there any way to clear the > sql join state? > Thanks to your reply. > -- Benchao Li Sc

Re: flink on yarn jdk版本问题

2020-01-14 文章 Benchao Li
ink特定使用yarn时的jdk目录,flink中是否有这样的功能?(不影响其他yarn上应用正常运行) > > > > ____________ > zjfpla...@hotmail.com > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: Re: flink消费kafka数据,如何从kafka端获取消费偏移信息

2020-01-12 文章 Benchao Li
-consumers-offset-committing-behaviour-configuration wqpapa 于2020年1月12日周日 下午9:34写道: > 感谢回复!主要想了解下如何从kafka端后台命令方式获取对应消费组的偏移信息。之前通过普通的java代码消费kafka数据,通过kafka-consumer-groups.sh > --describe可获取到消费偏移信息,但通过flink消费,不知道要怎么在kafka端获取偏移信息? > > > > > > > > > > 在

Re: flink消费kafka数据,如何从kafka端获取消费偏移信息

2020-01-12 文章 Benchao Li
-configuration wqpapa 于2020年1月12日周日 下午9:09写道: > flink通过FlinkKafkaConsumer消费kafka主题,设置group.id > 按setStartFromGroupOffsets取偏移,如何在kafka端获取对应group.id的偏移信息? > kafka为0.9版本,在zk那边找不到对应的group.id;通过kafka-consumer-groups.sh --describe > --group ,也取不到。请问下要怎么获取啊? -- Benchao Li School of Electronics

Re: flink遇到 valueState 自身的 NPE

2020-01-09 文章 Benchao Li
我感觉这个地方好像没有道理会有`uniqMark`变成`null`, 除非是什么地方反序列化出来`StreamMap`,并且没有调用`StreamMap.open()`. 但是看起来`StreamTask`是可以保证先调用`open`,再调用operator的处理函数的。我也看不出来这个地方有什么问题。 Kevin Liao 于2020年1月9日周四 下午8:15写道: > https://tva4.sinaimg.cn/large/63137227ly1gaqkn1nlykj20mm0wvgq8.jpg > > 抱歉,再试试这个 > > Benchao

Re: flink遇到 valueState 自身的 NPE

2020-01-09 文章 Benchao Li
jFlCK_-iC19CPfsTVCtwN3tdKnaKdLe2TbfVdFA0DnBUz8NrhV-mvmZlEwi9-ngK-WOy8yjA4fin1zaE2SJCf2zfBSZwGR2eY_E_WZQiFRmSBI2A7vpoyFvTV3E99MIi0MC5PUAeRiu4v4JIVDkV_yUGIUvoa7pxdf7LpZN_DbikQVk7yES8kxxL5qG2Eae8vftWJuBVi5mWTxvElBgInyUntobXHdxfb2YR4JdBgVPN5QionJiIc9g5i0ClGECZbyHPbsQy4pEVw=s0-l75-ft-l75-ft > > 谢谢,看看能否看见 &g

Re: flink遇到 valueState 自身的 NPE

2020-01-09 文章 Benchao Li
hi Kevin, 邮件里面贴不了图片,如果要贴图片,需要用一些第三方的图床工具。 或者你可以直接贴文字? Kevin Liao 于2020年1月9日周四 下午7:10写道: > [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png] > > Benchao Li 于2020年1月9日周四 下午6:42写道: > >> hi Kevin, >> >> 能贴一下MyMapFunction2.java:39 这里的代码吗? 从上面的日志看不出来是valueState是null呢。

Re: flink遇到 valueState 自身的 NPE

2020-01-09 文章 Benchao Li
) > at > > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164) > at > org.apache.flink.streaming.runtime.io > .Stream

Re: Re: Flink SQL Count Distinct performance optimization

2020-01-08 文章 Benchao Li
t; >>> ( >>> >>> SELECT >>> >>> 'ZL_005' as aggId, >>> >>> 'ZL_UV_PER_MINUTE' as pageId, >>> >>> deviceId, >>> >>> ts2Date(recvTime) as statkey >>> >>> from >>> >>> kafka_zl_etrack_event_stream >>> >>> ) >>> >>> GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024) >>> >>> ) as t1 >>> >>> group by aggId, pageId, statkey >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> Best >> >> > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: 使用Flink SQL时,碰到的 【Window can only be defined over a time attribute column】

2020-01-06 文章 Benchao Li
; INSERT INTO sql_out > SELECT product_id, TUMBLE_END(ts, INTERVAL '2' MINUTE) AS window_end, > count(*) AS cnt FROM kafka_out WHERE behavior = 'pv' GROUP BY product_id, > TUMBLE(ts, INTERVAL '2' MINUTE) > > > 错误信息:Window can only be defined over a time attribute column. 请大佬帮我解惑,谢

Re: SQL层应用维表join jdbc的时候,请问怎么动态感知维表数据的变化呢?

2020-01-03 文章 Benchao Li
= o.currency 详细内容可以参考文档: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins 刘世民 于2020年1月4日周六 上午11:27写道: > hi~ > 如图所示,在做kafka和jdbc > join的时候,jdbc数据全量加载并为Finished状态,这种情况下请问怎么感知jdbc表的数据更新呢?还是我哪里的配置不对,还请赐教 > > 小白敬上~ > > > > -- Bench

Re: [DISCUSS] Drop Kafka 0.8/0.9

2019-12-06 文章 Benchao Li
egards, > >>> > > > >>> > > Chesnay > >>> > > > >>> > > > >>> > > >>> > -- > >>> > > >>> > Konstantin Knauf | Solutions Architect > >>> > > >>> > +49 160 91394525 > >>> > > >>> > > >>> > Follow us @VervericaData Ververica <https://www.ververica.com/> > >>> > > >>> > > >>> > -- > >>> > > >>> > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > >>> > Conference > >>> > > >>> > Stream Processing | Event Driven | Real Time > >>> > > >>> > -- > >>> > > >>> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > >>> > > >>> > -- > >>> > Ververica GmbH > >>> > Registered at Amtsgericht Charlottenburg: HRB 158244 B > >>> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, > Ji > >>> > (Tony) Cheng > >>> > > >>> > >> > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: flink sql中怎么表达窗口的提前触发或延迟触发

2019-09-11 文章 Benchao Li
gt; > "ORDER" ... > "LIMIT" ... > "OFFSET" ... > "FETCH" ... > "," ... > > 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn

<    1   2   3   4