Re: 关于kafka connector通过python链接

2020-04-10 文章 Dian Fu
您好,

图片看不到,可以把图片上传到第三方网站,然后贴个链接,或者贴一下文本的报错信息。


> 在 2020年4月9日,下午4:41,秦寒  写道:
> 
> 您好
>根据你们的说明我做了如下配置,我用的是flink 1.10版本
> 1在pyflink/lib下面添加了kafka-clients-2.2.0.jar
> 
>  
> 2 在build-target/lib下面添加了flink-sql-connector-kafka_2.11-1.10.0.jar  
> flink-connector-kafka_2.11-1.10.0.jar 以及 flink-json-1.10.0-sql-jar.jar
> 
>  
> 3 构建PyFlink发布包并安装
> cd flink-python; python setup.py sdist 
> pip install dist/*.tar.gz
>  
>  
> 4 执行测试程序tumble_window.py报错如下,不知道你们有没有遇见过这个错误,望能解答
> [yy1s@rbtnode1 project]$ python3 tumble_window.py
> 
> 
>  
>  
>  
> 发件人: Hequn Cheng  
> 发送时间: 2020年4月9日 10:08
> 收件人: user-zh 
> 抄送: han...@chinaums.com
> 主题: Re: 关于kafka connector通过python链接
>  
> Hi 秦寒,
>  
> Dian 说得很完善了。除此之外,金竹的博客[1]有介绍“Python API 中如何使用 Kafka”,可能对你有帮助,可以看下。
>  
> Best, Hequn
>  
> [1] 
> https://enjoyment.cool/2019/08/28/Apache%20Flink%20%E8%AF%B4%E9%81%93%E7%B3%BB%E5%88%97-%20Python%20API%20%E4%B8%AD%E5%A6%82%E4%BD%95%E4%BD%BF%E7%94%A8%20Kafka/
>  
> 
>  
> On Thu, Apr 9, 2020 at 9:34 AM Dian Fu  > wrote:
>> 你指的是Python Table API中如何使用kafka connector的例子吗?这个是有例子的[1]。
>> 
>> 关于如何把kafka client的jar包配置到Python环境,分两种情况,当前有对应的两种解决方案:
>> 1)如果是local运行,需要把kafka client的jar拷贝到python环境中pyflink的lib目录下
>> 2)如果是remote运行,可以通过CLI的-j选项添加。
>> 
>> 这两种方式对于Python用户来说可能都不太便捷,所以已有一个JIRA[3]在考虑添加另外一种对Python用户来说更友好的方式,欢迎到JIRA里参与讨论。
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>  
>> >  
>> >
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html 
>>  
>> > >
>> [3] https://issues.apache.org/jira/browse/FLINK-16943 
>>  
>> > >
>> > 在 2020年4月9日,上午8:45,zhisheng > > > 写道:
>> > 
>> > hi, 秦寒
>> > 
>> > 暂时还没有 Python 这块的 API,可以去社区 JIRA 提建议
>> > 
>> > Best
>> > 
>> > zhisheng
>> > 
>> > 秦寒 mailto:han...@chinaums.com>> 于2020年4月8日周三 
>> > 下午4:10写道:
>> > 
>> >> 您好
>> >> 
>> >>   Flink的 kafka connector 文档中只有java 和scala的列子,能否添加python
>> >> 调用kafka的列子,包括如何添加kafka connector,kafka client的jar包配置到pyhon
>> >> 环境等,谢谢。
>> >> 
>> >> 
>> >> 
>> >> 
>> 



回复: 关于Flink1.10.0 flink-hbase guava依赖冲突问题

2020-04-10 文章 111
Hi,
具体的哪个版本不太确定,我只差了11,12,14,16,18,23这些….12以上都没有的
Best,
Xinghalo

flink反压问题求助

2020-04-10 文章 Junzhong Qin
在跑Flink任务时,遇到了operator反压问题,任务执行图如下,source(读Kafka),
KeyBy(抽取数据字段供keyBy操作使用),Parser(业务处理逻辑),Sink(写Kafka),除了KeyBy->Parser使用hash(KeyBy操作)链接,其他都使用RESCALE链接。(并发度仅供参考,这个是解决问题后的并发度,最初的并发度为
500->1000->3000->500)
[image: image.png]
相关metric
[image: image.png]
[image: image.png]
为了解决反压问题做的处理:
1. 增大Parse并发,KeyByOpe.buffers.outPoolUsage 上升速率有减缓,多次加并发依然没有解决
2. 优化Parse逻辑,减少CPU使用,效果不明显
3. 将Parse里的一些数据过滤逻辑移到KeyBy operator里面,效果不明显
最后猜测可能是KeyBy operator并发大和Parse链接hash操作占用NetWork资源过多导致反压,于是减少KeBy
operator的并发度,发现解决问题。但是想请教一下这个操作解决这个问题的具体原因。

谢谢!


Re: 关于flink state的问题

2020-04-10 文章 Congxian Qiu
你好:

1 state 如果不需要了,可以自己删除(如果是 Window 中使用的,可以进行配置在 Window 结束时删除)
2 可以使用 TTL State[1]
3 如果仅考虑 OOM 风险,可以考虑使用 RocksDBStateBackend

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl
Best,
Congxian


guanyq  于2020年4月11日周六 上午7:21写道:

> 您好:
>
> 1.随着程序的运行,task内存中的状态会不断增加,迟早会出现内存溢出问题,想知道一般都如何解决这个问题?
>
>


Re: 重复声明watermark的问题

2020-04-10 文章 lec ssmi
谢谢,主要是我再次声明watermark后,再转成table,然后再window操作,就一直报错,现在能确定是时间属性字段的问题。我用的阿里云的blink,他们开发人员说好像不能这么操作。

jun su  于 2020年4月10日周五 23:36写道:

> hi,
> 1. 以我的理解, 再次下发watermark会覆盖之前的, 所以在经过n个operator后,可以用再次声明watermark的方式来调整.
> 2. 如果是DataStream模式下, 两个流join后, 下游收到的watermark是较小的流的watermark,
> 如果是stream模式下的sql window join, 那么下游收到的
> watermark是较小一侧表的watermark - sql设定的时间窗口.
>
> lec ssmi  于2020年4月8日周三 下午2:05写道:
>
> > 大家好:
> >   请问,对一个DataStream重复声明watermark,前面的watermark会被覆盖掉吗?
> >   比如我再source端声明了watermark,进行了一系列操作后,我觉得watermark的延迟不满足需求,就再次声明一次。
> >   另外,稍微咨询下另外一个问题,两个流join之后,watermark会消失吗?看书上说的是,以两个流最小的watermark(全局最小)为准。
> >   主要是在阿里云Blink上,使用sql进行join后,说的是时间属性字段会消失。有点不明白。
> >
>
>
> --
> Best,
> Jun Su
>


关于flink state的问题

2020-04-10 文章 guanyq
您好:

1.随着程序的运行,task内存中的状态会不断增加,迟早会出现内存溢出问题,想知道一般都如何解决这个问题?

 

Re: 重复声明watermark的问题

2020-04-10 文章 jun su
hi,
1. 以我的理解, 再次下发watermark会覆盖之前的, 所以在经过n个operator后,可以用再次声明watermark的方式来调整.
2. 如果是DataStream模式下, 两个流join后, 下游收到的watermark是较小的流的watermark,
如果是stream模式下的sql window join, 那么下游收到的
watermark是较小一侧表的watermark - sql设定的时间窗口.

lec ssmi  于2020年4月8日周三 下午2:05写道:

> 大家好:
>   请问,对一个DataStream重复声明watermark,前面的watermark会被覆盖掉吗?
>   比如我再source端声明了watermark,进行了一系列操作后,我觉得watermark的延迟不满足需求,就再次声明一次。
>   另外,稍微咨询下另外一个问题,两个流join之后,watermark会消失吗?看书上说的是,以两个流最小的watermark(全局最小)为准。
>   主要是在阿里云Blink上,使用sql进行join后,说的是时间属性字段会消失。有点不明白。
>


-- 
Best,
Jun Su


Re: 如何合并 binlog stream 和 table stream?

2020-04-10 文章 刘宇宝
抄了下 
https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestSequentialReadingStreamOperator.java
 , 可以达到串行的效果了:

   DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
   DataStream snapshotStream = 
env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);
   DataStream tableStream = snapshotStream.connect(binlogstream);
   tableStream.transform(“Concat”, new TypeHint<….>(){},   new 
SequentialReadingStreamOperator<>());

但是如果打开 checkpointing,那么 flink 会报错:java.lang.UnsupportedOperationException: 
Checkpointing is currently not supported for operators that implement 
InputSelectable

跪了,跪了。。。


On 2020/4/7, 4:45 PM, "刘宇宝"  wrote:

你这是个新思路,分成两个 job,但是感觉不太值当,或许这里是 Flink目前 
API或者说编程模型很受限的地方,我只是源头数据来自两个地方,要合并下两个数据源,所有下游处理都是一样的。如果按照 actor 的松散模式,我是可以在两个 
SourceActor 之间协调的,一个 SourceActor 发完后,通知另一个 SourceActor 再发,或者启动一个新的 
SourceActor,大家都往同一个下游 actor 发消息。

user@flink 里那个哥们提到 InputSelectable,我还没看明白怎么能用到 DataStream上,似乎它只实现在 
StreamOperator 上:  
https://github.com/apache/flink/search?p=2=InputSelectable_q=InputSelectable

我目前想到一个笨方法,实现一个 SourceFunction,把 FlinkKafkaConsumerBase 和 JDBCInputFormat 
包到一起,这样可以先把 JDBCInputFormat 数据发完了,再发 FlinkKafkaConsumerBase。 
但是这样做只能单并发,多并发的话需要一个分布式的 barrier,flink 没有内置支持,感觉不是个优美的解决方案。

非常感谢你的解答!



On 2020/4/7, 4:29 PM, "Jark Wu"  wrote:

如果你的作业没有 state,那么 全量和增量部分,可以分成两个独立的作业。
如果你的作业有 state,主要就是设计 state 复用的问题。两个作业的拓扑结构需要确保一致(为了复用 savepoint),一个作业的
source operator 是 jdbc,另一个 source operator 是 kafka。
当运行完 jdbc 作业后,对作业触发一次 savepoint,然后用这个 savepoint 恢复 kafka 作业,可以从 earliest
开始读取(假设作业支持幂等)。

这里的一个问题,主要是如何对 jdbc 作业触发 savepoint,因为 jdbc InputFormat 目前是
bounded,所以读完后整个作业就结束了,就无法进行 savepoint。
所以这里可能需要自己修改下源码,让 jdbc source 永远不要结束,但通过日志或者 metric 
或其他方式通知外界数据已经读完(可以开始触发
savepoint)。

希望这些可以帮助到你。

Best.
Jark


On Tue, 7 Apr 2020 at 16:14, 刘宇宝  wrote:

> 我看了下 streamsql 那个 bootstrap 一段,没看懂怎么处理,我现在已经有 mysql table,算是 
materialized
> view 了,也有一份
> Kafka 里的 binlog 数据,问题的麻烦在于怎么「先消费 jdbc table, 消费完后,再切换到 kafka 上」,从 
flink
> 文档来看,一旦
> 我把 jdbc table 的流和 kafka 的流加入 env 里,env.execute() 
之后,两个流就同时往下游发数据了——我期望的是
> jdbc table
> 的流发完了,才开始发 kafka 的流。
>
> 谢谢!
>
> On 2020/4/7, 2:16 PM, "Jark Wu"  wrote:
>
> Hi,
>
> 你这里的合并是用join 来做么? 这样的话,会比较耗性能。
>
> 一种做法是先消费 jdbc table, 消费完后,再切换到 kafka 上。这种要求 binlog 
是幂等操作的,因为会有多处理一部分的
> binlog,没法做到 精确地切换到 kafka offset 上。
>
> 另外你也可以参考下 StreamSQL 的 bootstrap 的做法:
> https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar
>
> Best,
> Jark
>
>
> On Sun, 5 Apr 2020 at 22:48, 刘宇宝  wrote:
>
> > 大家好,
> >
> > 我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于 
mysql_server.test.tableA
> 有一个
> > topic “mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:
> >
> >
> >   1.  先连接上 Kafka 开始消费 topic “mysql_server.test.tableA”,确保连接成功,记为
> > binlog-stream,但是要暂停消费 Kafka;
> >   2.  用 JDBCInputFormat 读取 test.tableA 到一个 DataStream 里,记为
> table-stream;
> >   3.  合并两个 streams,消费完 table-stream 后再开始消费 binlog-stream,这样可以确保
> binlog 是
> > *后*  应用到某个快照表上。
> >
> > 问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用 flink state 做一个全局状态
> > startBinlog,初始值为 false:
> >
> >   binlog-stream -> waitOperator   ->   sinkOperator
> >   table-stream -> notifyOperator -> sinkOperator
> >
> > 两个流被合并输出到 sinkOperator,waitOperator() 会 while loop阻塞式的检查全局状态, 等
> > table-stream 消费完(不知道怎么判断消费完了。。。),  notifyOperator 修改全局状态,这样
> binlog-stream
> > 就能被继续消费了。
> >
> > 但由于 kafka consumer 如果长期阻塞不 ack 的话,kafka consumer 
会被断开,所以这个做法应该是不行的。
> >
> > 请教怎么破?
> >
> > 谢谢!
> >
> >
>
>
>






dayofweek异常

2020-04-10 文章 allanqinjy
hi,
   在flink中使用hql函数的时候 dayofweek  报错,编译都没有通过。我的使用方式 
用-MM-dd也是一样的错误。哪位大神遇到过!
DAYOFWEEK(to_date(from_unixtime (unix_timestamp(cast(dt as 
string),'MMdd'),'-MM-dd HH:mm:ss')))


,Cannot apply 'EXTRACT' to arguments of type 'EXTRACT( FROM 
)'. Supported form(s): 'EXTRACT( FROM 
)'
10-04-2020 16:26:14 CST user_visit_terminal_prefer_7d INFO - 
'EXTRACT( FROM )'
10-04-2020 16:26:14 CST user_visit_terminal_prefer_7d INFO -at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
10-04-2020 16:26:14 CST user_visit_terminal_prefer_7d INFO -at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)

Re: Flink SQL文件连接器中proctime()创建表失败的问题

2020-04-10 文章 Jark Wu
这应该是个已知问题,https://issues.apache.org/jira/browse/FLINK-16160
请先使用 DDL 吧。

Best,
Jark

On Fri, 10 Apr 2020 at 16:21, Night_xing <1029681...@qq.com> wrote:

> Flink版本:1.10.0
>
> 使用了BlinkPlanner之后,使用java代码创建本地的CSV表时,不支持proctime的配置
>
> 创建代码如下:
> tableEnv.connect(new FileSystem()
> .path("file:///Users/test/csv/demo.csv")
> )
> .withFormat(new Csv())
> .withSchema(
> new Schema()
> .field("id", DataTypes.STRING())
> .field("name", DataTypes.STRING())
> .field("user_action_time",
> DataTypes.TIMESTAMP(3)).proctime()
> )
> .registerTableSource("csv_table");
>
> 异常信息如下:
> Reason: No factory supports all properties.
>
> The matching candidates:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> Unsupported property keys:
> schema.#.proctime
>
> 如果去掉proctime的设置就没有问题。使用DDL创建则没有任何问题:
> tableEnv.sqlUpdate("create table csv_table(" +
> "id string," +
> "name string," +
> "user_action_time as PROCTIME()" +
> ") with (" +
> " 'connector.type' = 'filesystem'," +
> " 'connector.path' = 'file:///Users/test/csv/demo.csv'," +
> " 'format.type' = 'csv'" +
> ")");


Flink SQL????????????proctime()????????????????

2020-04-10 文章 Night_xing
Flink??1.10.0

??BlinkPlanner??java??CSVproctime??

??
tableEnv.connect(new FileSystem()
.path("file:///Users/test/csv/demo.csv")
)
.withFormat(new Csv())
.withSchema(
new Schema()
.field("id", DataTypes.STRING())
.field("name", DataTypes.STRING())
.field("user_action_time", 
DataTypes.TIMESTAMP(3)).proctime()
)
.registerTableSource("csv_table");

??
Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Unsupported property keys:
schema.#.proctime

proctime??DDL
tableEnv.sqlUpdate("create table csv_table(" +
"id string," +
"name string," +
"user_action_time as PROCTIME()" +
") with (" +
" 'connector.type' = 'filesystem'," +
" 'connector.path' = 'file:///Users/test/csv/demo.csv'," +
" 'format.type' = 'csv'" +
")");

Re: 关于Flink1.10.0 flink-hbase guava依赖冲突问题

2020-04-10 文章 Jark Wu
Hi,

这是一个已知问题,而且确实挺影响开发效率的。

> 其次HBase需要guava12.0版本(更高的版本就移除这个方法了)
你知道 HBase 从哪个版本开始不使用这个方法/版本了吗? 和 HBase 1.4.x server 兼容吗?

cc @ZhengHu  who may have more knowledge on this.

Best,
Jark

On Fri, 10 Apr 2020 at 08:21, 111  wrote:

> Hi,
>
>
> 原来IDEA的问题,那这样就不能debug调试了。
> 目前看如果手动指定guava的版本为16,只有本地的那个HBase服务有问题(内部使用的murmur hash.hasString()生成id)
> 如果把单元测试改成连接远程HBase,应该就没问题了
>
>
> Best,
> xinghalo
>
>
> 在2020年04月9日 23:00,Jingsong Li 写道:
> Hi,
>
> 是的,作业运行起来是没问题的,因为都shade过,但是很不幸的是IDEA运行单测就会有问题。。。
>
> 目前可以通过mvn test的方式来运行单测。
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 9, 2020 at 9:34 PM Yun Gao 
> wrote:
>
> 现在的现象是作业运行起来会有报错么?能发一下pom和大体的作业的样子不?
>
>
> --
> From:111 
> Send Time:2020 Apr. 9 (Thu.) 14:31
> To:user-zh@flink.apache.org 
> Subject:关于Flink1.10.0 flink-hbase guava依赖冲突问题
>
> Hi,
> 我这边发现hbase-connector模块对guava的依赖有冲突:
>
>
>
>
> 首先flink中的calcite需要guava是16以上的版本(低版本没有这个类),因为要使用:com/google/common/collect/MultimapBuilder$SortedSetMultimapBuilder
>
>
> 其次HBase需要guava12.0版本(更高的版本就移除这个方法了),因为要使用:com.google.common.hash.HashFunction.hashString(Ljava/lang/CharSequence;)Lcom/google/common/hash/HashCode;
>
>
> 不知道官方在使用单元测试时,是否有什么特殊的配置能规避这个问题?
>
>
> Best,
> Xinghalo
>
>
>
> --
> Best, Jingsong Lee
>


Flink SQL????????????proctime()????????????????

2020-04-10 文章 Night_xing
Flink??1.10.0

??BlinkPlanner??java??CSVproctime??
??

tableEnv.connect(new FileSystem()

.path("file:///Users/test/csv/demo.csv")
)
.withFormat(new Csv())
.withSchema(
   
 new Schema()
   
 .field("id", DataTypes.STRING())
   
 .field("name", DataTypes.STRING())
   
 .field("user_action_time", 
DataTypes.TIMESTAMP(3)).proctime()
)

.registerTableSource("csv_table");

??
Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Unsupported property keys:
schema.#.proctime

proctime??DDL
tableEnv.sqlUpdate("create table csv_table(" +
"id string," +
"name string," +
"user_action_time as 
PROCTIME()" +
") with (" +
" 'connector.type' = 
'filesystem'," +
" 'connector.path' = 
'file:///Users/test/csv/demo.csv'," +
" 'format.type' = 
'csv'" +
")");