Re: flink dataStream多次sink DAG重复驱动执行?

2021-03-07 文章 Evan
个人理解是不会重复驱动执行的,具体你可以测试一下,但是从底层原理上讲,我也讲不了。 发件人: lp 发送时间: 2021-03-05 17:31 收件人: user-zh 主题: flink dataStream多次sink DAG重复驱动执行? 有个疑问, 如下程序片段: -- Properties properties = new Properties(); properties.setProperty("bootstrap.servers",kafkaAddr);

Re: 退订

2021-03-07 文章 Evan
Hi, 退订请发邮件到 user-zh-unsubscr...@flink.apache.org 详细的可以参考 [1] [1] https://flink.apache.org/zh/community.html#section-1 发件人: zenglong chen 发送时间: 2021-03-08 10:00 收件人: user-zh 主题: 退订 退订

Re: Re: 社区有人实现过Flink的MongodbSource吗?

2021-02-24 文章 Evan
16:23,Evan 写道: > > > 有人完整的实现Flink的MongodbSource吗 > 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化 > >

社区有人实现过Flink的MongodbSource吗?

2021-02-24 文章 Evan
有人完整的实现Flink的MongodbSource吗 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化

????: Pyflink????kerberos??????Kafka??????????

2021-01-30 文章 Evan
?? ?? ?? ?? 2021-01-30 17:53 user-zh ?? Pyflinkkerberos??Kafka?? ?? ??pyflinkkafka?? flink-conf.yamlkerberos

Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-20 文章 Evan
你好,可以获取 CREATE TABLE KafkaTable ( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', `partition` BIGINT METADATA VIRTUAL, `offset` BIGINT METADATA VIRTUAL, `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior',

????: flink sql hop????????udaf????????????merge??????????????????????

2021-01-17 文章 Evan
?? ?? bigdata ?? 2021-01-18 14:52 user-zh ?? flink sql hopudafmerge?? ?? flink1.10.1

????: flink sql hop????udaf????

2021-01-17 文章 Evan
merge??marge Evan Cheng 2021??1??18??09:00:07 bigdata ?? 2021-01-17 22:31 user-zh ?? flink sql hopudaf ?? flink1.10.1sql hop??udaf

Re: 回复: flink sql读kafka元数据问题

2021-01-15 文章 Evan
我知道 酷酷同学 啥意思,kafka 消息是key,value形式,当然这个key默认是null,他想在 select 语句里将 kafka的key值读出来对吧。 我也在文档里找了,确实是没有相关文档说明 发件人: 酷酷的浑蛋 发送时间: 2021-01-15 16:35 收件人: user-zh@flink.apache.org 主题: 回复: flink sql读kafka元数据问题 直接读topic,headers是空,我仅仅是想读key,不管topic是谁写入的 在2021年01月14日 16:03,酷酷的浑蛋 写道:

Re: flink1.11使用createStatementSet报错 No operators defined in streaming topology

2021-01-14 文章 Evan
我的也是flink 1.11.0版本的,也是使用的stmtSet.execute()方式,是可以正常运行的,你可以debug检查一下你要执行的SQL语句 发件人: datayangl 发送时间: 2021-01-14 16:13 收件人: user-zh 主题: flink1.11使用createStatementSet报错 No operators defined in streaming topology flink版本: 1.11 使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka 代码如下: def main(args:

回复: 回复: 请教个Flink checkpoint的问题

2021-01-14 文章 Evan
RocksDBStateBackend(checkpointPath); streamEnv.setStateBackend(rocksDBStateBackend); yinghua...@163.com 发件人: Evan 发送时间: 2021-01-14 17:55 收件人: user-zh 主题: 回复: 请教个Flink checkpoint的问题 代码图挂掉了,看不到代码 发件人: yinghua...@163.com 发送时间: 2021-01-14 17:26 收件人: user-zh 主题: 请教个Flink checkpoint的问题 我在yarn上提交任务时,设置

回复: 请教个Flink checkpoint的问题

2021-01-14 文章 Evan
代码图挂掉了,看不到代码 发件人: yinghua...@163.com 发送时间: 2021-01-14 17:26 收件人: user-zh 主题: 请教个Flink checkpoint的问题 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下

Re: Re: Flink webui 查询任务信息报错500

2021-01-13 文章 Evan
你可以从akka的原理架构中获取一些信息 参考: https://akka.io 之前在其他博主的介绍中看到过,感觉意思差不多,上面说这是JobManager和TaskManager之间通信时发送的消息大小的最大值 发件人: 赵一旦 发送时间: 2021-01-14 14:19 收件人: user-zh 主题: Re: Flink webui 查询任务信息报错500 好的,我找到了这个参数。不过这个参数表达啥含义知道吗,我看10MB不是个小数字感觉。 Evan 于2021年1月14日周四 下午1:54写道: > 有这样一个参数“akka.framesize”

Re: Flink webui 查询任务信息报错500

2021-01-13 文章 Evan
有这样一个参数“akka.framesize” ,可以在你启动flink的时候加上 或者 在conf/flink-conf.yaml 配置上: akka.framesize "10485760b"StringMaximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires

Re: Flink webui 查询任务信息报错500

2021-01-13 文章 Evan
这是flink的Akka部分报的错,相关源码如下,可以找找这个 maximumFramesize 怎么修改? https://github.com/apache/flink/blob/d093611b5dfab95fe62e4f861879762ca2e43437/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java: private Either, AkkaRpcException> serializeRemoteResultAndVerifySize(

Re: FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 文章 Evan
你好,在数据库中,Float类型存的是个近似值,不能用类似于 = 或者 != 的比较语句,所以也不支持IN操作 希望能帮助到你 From: jy l Date: 2021-01-12 18:04 To: user-zh Subject: FlinkSQL Filter Error With Float Column on flink-1.12.0 Hi: Flink SQL filter data throw an exception, code: def main(args: Array[String]): Unit = { val env =

回复: flink sql消费kafka sink到mysql问题

2021-01-05 文章 Evan
flinksql 貌似是目前做不到你说的这样 发件人: air23 发送时间: 2021-01-06 12:29 收件人: user-zh 主题: flink sql消费kafka sink到mysql问题 你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了 然后再重启 发现报错的数据 会丢失 采用的scan.startup.mode' = 'group-offsets' 按理说 不是要重新消费 失败的那条数据 开始消费吗? 请问如何配置 可以不丢失数据 CREATE TABLE source1 ( id BIGINT ,

????: flinksql1.11 ????phoenix????????Caused by: org.apache.calcite.avatica.NoSuchStatementException

2021-01-05 文章 Evan
??bug ?? 2021-01-05 20:20 user-zh ?? flinksql1.11 phoenixCaused by: org.apache.calcite.avatica.NoSuchStatementException ?? flinkv1.11phoneix 1.14.1 CREATE TABLE

回复: 邮件退订

2020-12-15 文章 Evan
你好,退订需发邮件到 user-zh-unsubscr...@flink.apache.org 更多详细情况可以参考 [1] https://flink.apache.org/community.html#mailing-lists 发件人: 谢治平 发送时间: 2020-12-16 09:08 收件人: user-zh 主题: 邮件退订 您好,邮件退订一下

Re: flink无法写入数据到ES中

2020-12-13 文章 Evan
你的SQL语句语法有误,请参考: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/elasticsearch.html 希望能帮助到你! 发件人: 小墨鱼 发送时间: 2020-12-11 14:46 收件人: user-zh 主题: flink无法写入数据到ES中 我在使用Flink写入数据到ES中,程序可以执行成功但是ES中没有数据,而且没有任何报错信息我首先创建了一个sink的es表String sql = "CREATE TABLE es_sink

Re: Re: 关于cluster.evenly-spread-out-slots参数的底层原理

2020-11-06 文章 Evan
发件人: Shawn Huang 发送时间: 2020-11-06 16:56 收件人: user-zh 主题: Re: 关于cluster.evenly-spread-out-slots参数的底层原理 我说一下我看源码(1.11.2)之后的理解吧,不一定准确,仅供参考。 cluster.evenly-spread-out-slots 这个参数设置后会作用在两个地方: 1. JobMaster 的 Scheduler 组件 2. ResourceManager 的 SlotManager 组件 对于 JobMaster 中的 Scheduler, 它在给

????: ?????? pyflink??where??????????????????????

2020-11-01 文章 Evan
where??where??pyflink??api??where?? ?? 2020-11-02 10:15 user-zh ?? ?? pyflink??where?? ??

?????? flink1.11????????

2020-07-20 文章 Evan
Hi,??jar??1??45jar ---- ??:

FlinkSQL 任务提交后 任务名称问题

2020-07-18 文章 Evan
代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into esSinkTable select ... from kafkaSourceTable")执行 任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table” 这样很不友好啊,能不能我自己指定任务名称呢?

回复:ddl es 报错

2020-07-09 文章 Evan
Hello, 这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作 真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。 而tableEnv.toRetractStream(table, Row.class).print(); 这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。 2020年7月9日15:31:56 --原始邮件-- 发件人:"出发"<573693...@qq.com;

代码中如何取消正在运行的Flink Streaming作业

2020-07-08 文章 Evan
这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming API有没有提供类似的接口,调用后就能停止这个Stream作业呢?

回复:fink新增计算逻辑时kafka从头开始追平消费记录

2020-04-07 文章 Evan
之前的代码好像乱码了,我设置了一下,重新发一下,建议你 在获取consumer之后,再设置一下consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6 /** * @param env * @param topic * @param time 订阅的时间 * @return * @throws IllegalAccessException

回复: 回复:fink新增计算逻辑时kafka从头开始追平消费记录

2020-04-07 文章 Evan
苟刚你好,刚才看了你的kafka消费代码,建议你在获取consumer后,增加一行如下代码 “consumer.setStartFromLatest();”然后再测试一下。 /** * @param env * @param topic * @param time 订阅的时间 * @return * @throws IllegalAccessException */ public static DataStreamSource

??????fink??????????????kafka????????????????????

2020-04-07 文章 Evan
??kafkaoffset ---- ??:""

?????? Re: flink????kafka????????????kafka??????????????????

2020-01-12 文章 Evan
??kafka??Offset?? kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zkhost:2181 --group ${group.id} --topic ${topic_name} zkhost ??group.id??topic_name Group Topic Pid Offset logSizeLag Owner test

Flink????????????

2019-09-09 文章 Evan
??flink1.7.1 ??centos 7 job??start-cluster.sh test04 ??flink job?? $ bin/flink run -m test04:8081 -c org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass flink-scala-project1.jar

Flink????????????

2019-09-09 文章 Evan
??flink1.7.1 ??centos 7 job??start-cluster.sh test04 ??flink job?? $ bin/flink run -m test04:8081 -c org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass flink-scala-project1.jar