stream sink hive 在hdfs ha模式下

2020-08-02 Thread air23
hi 你好
  我这边集群是cdh的。 配置了hdfs ha模式
 在使用 kafka sink 到hive 时候找不到nameservices
java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservices1


请问 在ha模式下 应该怎么配置

Re: pyflink 消费kafka protobuf数据

2020-08-02 Thread Benchao Li
Hi,

Flink暂时还没有内置的protobuf format,社区正在讨论实现一个protobuf format[1],预期在1.12来支持。

目前来讲,你可以考虑自定义一个format,或者直接定义一个没有任何解析直接转发byte[] 数据的format,
然后用UDF来解析。

[1] https://issues.apache.org/jira/browse/FLINK-18202

stephenlee <871826...@qq.com> 于2020年7月31日周五 下午10:27写道:

> hi,各位大佬好:
> 我是flink新手,我想问一下如何使用pyflink 消费kafka protobuf数据?我试了当做string
> 读取没有成功,查了下官方的pyflink文档,没有找到相关资料。还望大佬们帮忙看看
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


flink??ScalarFunction????????

2020-08-02 Thread ??????
  
flinkudf??ScalarFunctionkafkaScalarFunction??udf1??flinkkafka??flinkudf1kafka??flink??udf1??udf1flink
  

Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

2020-08-02 Thread Qingsheng Ren
Hi Vikash,

Sorry for the late reply. Is your version of Flink kafka *connector* 1.10.1
too? Actually it's a bug in the connector, so I think you need to upgrade
the connector to 1.10.1 too, not just Flink itself.

I tried Flink 1.10.0/1.10.1 + flink-kafka-connector 1.10.0 and indeed
reproduced the bug. After upgrading flink-kafka-connector to 1.10.1, the
error disappeared.

On Fri, Jul 31, 2020 at 7:02 PM Vikash Dat  wrote:

> Thanks for the reply. I am currently using 1.10 but also saw it happens in
> 1.10.1 when experimenting. I have not tried 1.11 since EMR only has up to
> 1.10 at the moment. Are there any known work arounds?
>
> On Fri, Jul 31, 2020 at 02:42 Qingsheng Ren  wrote:
>
>> Hi Vikash,
>>
>> It's a bug about classloader used in `abortTransaction()` method in
>> `FlinkKafkaProducer`, Flink version 1.10.0. I think it has been fixed in
>> 1.10.1 and 1.11 according to FLINK-16262. Are you using Flink version
>> 1.10.0?
>>
>>
>> Vikash Dat  于2020年7月30日周四 下午9:26写道:
>>
>>> Has anyone had success with using exactly_once in a kafka producer in
>>> flink?
>>> As of right now I don't think the code shown in the docs
>>> (
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer
>>> )
>>> actually works.
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>
>>
>> --
>> Best Regards,
>>
>> *Qingsheng Ren*
>>
>> Electrical and Computer Engineering
>> Carnegie Mellon University
>>
>> Email: renqs...@gmail.com
>>
>

-- 
Best Regards,

*Qingsheng Ren*

Electrical and Computer Engineering
Carnegie Mellon University

Email: renqs...@gmail.com


Re: flink 1.11.0 conenctor-jdbc 报错

2020-08-02 Thread song wang
对,就是这个原因

Leonard Xu  于2020年8月3日周一 上午10:26写道:

> Hi
>
> > 在 2020年8月3日,10:16,song wang  写道:
> >
> > 查询 integer
>
> 如果MySQL中数据类型是 INT UNSIGNED,Flink 中 对应的类型是 BIGINT,
> 你检查下是不是这个原因,类型映射可以参考[1]
>
> Best
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#data-type-mapping
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#data-type-mapping
> >


Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 Thread chenxuying
谢谢, 明白了








在 2020-08-03 10:42:53,"Leonard Xu"  写道:
>如果 DB 支持 upsert 语法,执行的是update, 如果不支持 upsert语法, 则是 delete + insert,MySQL 和 PG 
>都支持 upsert, 底层对应的sql语句是
>
>Database   Upsert Grammar
>MySQL  INSERT .. ON DUPLICATE KEY UPDATE ..
>PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..
>
>MySQL connector 不支持 replace into, 用的是 on duplicate key update.
>
>祝好
>Leonard 
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#idempotent-writes
> 
>
>
>
>> 在 2020年8月3日,10:33,chenxuying  写道:
>> 
>> 你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT 
>> OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table
>> 然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update 
>> , 我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-08-02 09:48:04,"Leonard Xu"  写道:
>>> Hi,
>>> 
>>> 这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 
>>> 支持的INSERT语法,但是不是所有的 connector 都支持  INSERT OVERWRITE, 目前支持的只有 Filesystem 
>>> connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持  
>>> INSERT OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1],
>>> 就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT 
>>> OVERWRITE到DB的场景吗?
>>> 
>>> Best
>>> Leonard
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
>>>  
>>> 
>>> 
 在 2020年8月1日,19:20,chenxuying  写道:
 
 Hello
 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
 overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
 Exception in thread "main" org.apache.flink.table.api.ValidationException: 
 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
 SupportsOverwrite interface.
 是得自定义connector吗,实现DynamicTableSink?
 
 
 祝好
 chenxuying
 [1] 
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
>>> 
>


Flink 1.11.1 消费带SASL的Kafka报错: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config

2020-08-02 Thread RS
Hi, 
我尝试消费SASL机制的Kafka集群


jaas.conf 文件内容:
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin001"
  password="123456";
};


执行命令如下:
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/jaas.conf"
./bin/sql-client.sh embedded



CREATE TABLE t1

(vendor STRING)

WITH (

'connector' = 'kafka',

'topic' = 'test',

'properties.bootstrap.servers' = '127.0.0.1:9092',

'properties.group.id' = 'g1',

'properties.sasl.mechanisms'='PLAIN',

'properties.sasl.username'='admin001',

'properties.sasl.password'='123456',

'properties.security.protocol'='SASL_PLAINTEXT',

'format' = 'json',

'scan.startup.mode' = 'earliest-offset',

'json.fail-on-missing-field' = 'false',

'json.ignore-parse-errors' = 'true'

);



然后报错提示:
Flink SQL> select * from t1;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: No serviceName defined in either JAAS or 
Kafka config


请教下, 这个该如何解决?


Thx



UDF:Type is not supported: ANY

2020-08-02 Thread zilong xiao
最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is not
supported:
ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array Json2Map
udf应该怎么操作呢?求前辈指导

udfd代码如下:

public class Json2List extends ScalarFunction {

   private static final Logger LOG = LoggerFactory.getLogger(Json2List.class);

   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
  .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
  .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ;

   public Json2List(){}

   public List eval(String param) {
  List result = new ArrayList<>();
  try {
 List> list =
OBJECT_MAPPER.readValue(param, List.class);
 for(Map map : list){
result.add(OBJECT_MAPPER.writeValueAsString(map));
 }
 return result;
  } catch (JsonProcessingException e){
 LOG.error("failed to convert json to array, param is: {}", param, e);
  }
  return result;
   }


   @Override
   public TypeInformation> getResultType(Class[] signature) {
  return Types.LIST(Types.STRING);
   }

}


Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 Thread chenxuying
你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT 
OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table
然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update , 
我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持

















在 2020-08-02 09:48:04,"Leonard Xu"  写道:
>Hi,
>
>这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 
>支持的INSERT语法,但是不是所有的 connector 都支持  INSERT OVERWRITE, 目前支持的只有 Filesystem 
>connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持  INSERT 
>OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1],
>就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT 
>OVERWRITE到DB的场景吗?
>
>Best
>Leonard
>[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> 
>
>
>> 在 2020年8月1日,19:20,chenxuying  写道:
>> 
>> Hello
>> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
>> overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
>> SupportsOverwrite interface.
>> 是得自定义connector吗,实现DynamicTableSink?
>> 
>> 
>> 祝好
>> chenxuying
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
>


Re: flink 1.11.0 conenctor-jdbc 报错

2020-08-02 Thread Leonard Xu
Hi

> 在 2020年8月3日,10:16,song wang  写道:
> 
> 查询 integer

如果MySQL中数据类型是 INT UNSIGNED,Flink 中 对应的类型是 BIGINT,
你检查下是不是这个原因,类型映射可以参考[1]

Best
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#data-type-mapping
 


Re: 数据预览

2020-08-02 Thread godfrey he
如果你想在client端拿到query的结果做preview的话,目前API层面支持直接collect或者print执行结果,可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#execute-a-query

Jeff Zhang  于2020年8月1日周六 下午11:01写道:

> Apache Zeppelin有自己的rest api,你可以用rest api来提交flink sql
> 以及拿sql结果,目前Zeppelin社区正在做一个Client API (Zeppelin SDK),
> 用户可以更加方便的调用Zeppelin的功能。具体可以参考
> https://issues.apache.org/jira/browse/ZEPPELIN-4981
>
> 这里有Sample code 可以参考
>
> https://github.com/zjffdu/zeppelin/blob/ZEPPELIN-4981/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java#L298
>
> 对于Flink on Zeppelin感兴趣的,可以加入钉钉群:32803524
>
>
>
> forideal  于2020年8月1日周六 下午7:49写道:
>
> > 你好,我的朋友
> >
> >
> >最近我看 Flink doc 中的文档中有了如下 connector
> >   DataGen
> >   Print
> >   BlackHole
> >这大大的方便了开发和调试。不过,我还是不太满足,想了解一下数据预览相关的做法。
> >比如我想,如果我有一个 Flink 的 `driver` ,然后,我使用这个 driver 提交一条 SQL,我从
> ResultSet
> > 中获取数据。这样又可以大大的方面我们的 Flink SQL 开发者。
> >在社区中,我已经体验了 Apache Zeppelin ,他可以让我提交 Flink SQL,然后在页面上面等待刷新的结果,但是
> > Zeppelin 目前不能很好的集成到我们的 Flink web IDE 中。想了解一下如何实现数据预览。
> >
> >
> >Best forideal
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-02 Thread Shuiqiang Chen
Hi jincheng,

Thanks for the discussion. +1 for the FLIP.

A well-organized documentation will greatly improve the efficiency and
experience for developers.

Best,
Shuiqiang

Hequn Cheng  于2020年8月1日周六 上午8:42写道:

> Hi Jincheng,
>
> Thanks a lot for raising the discussion. +1 for the FLIP.
>
> I think this will bring big benefits for the PyFlink users. Currently, the
> Python TableAPI document is hidden deeply under the TableAPI tab which
> makes it quite unreadable. Also, the PyFlink documentation is mixed with
> Java/Scala documentation. It is hard for users to have an overview of all
> the PyFlink documents. As more and more functionalities are added into
> PyFlink, I think it's time for us to refactor the document.
>
> Best,
> Hequn
>
>
> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira 
> wrote:
>
>> Hi, Jincheng!
>>
>> Thanks for creating this detailed FLIP, it will make a big difference in
>> the experience of Python developers using Flink. I'm interested in
>> contributing to this work, so I'll reach out to you offline!
>>
>> Also, thanks for sharing some information on the adoption of PyFlink, it's
>> great to see that there are already production users.
>>
>> Marta
>>
>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  wrote:
>>
>> > Hi Jincheng,
>> >
>> > Thanks a lot for bringing up this discussion and the proposal.
>> >
>> > Big +1 for improving the structure of PyFlink doc.
>> >
>> > It will be very friendly to give PyFlink users a unified entrance to
>> learn
>> > PyFlink documents.
>> >
>> > Best,
>> > Xingbo
>> >
>> > Dian Fu  于2020年7月31日周五 上午11:00写道:
>> >
>> >> Hi Jincheng,
>> >>
>> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
>> >> improve the Python API doc.
>> >>
>> >> I have received many feedbacks from PyFlink beginners about
>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
>> mixed
>> >> with the Java doc and it's not easy to find the docs he wants to know.
>> >>
>> >> I think it would greatly improve the user experience if we can have one
>> >> place which includes most knowledges PyFlink users should know.
>> >>
>> >> Regards,
>> >> Dian
>> >>
>> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
>> >>
>> >> Hi folks,
>> >>
>> >> Since the release of Flink 1.11, users of PyFlink have continued to
>> grow.
>> >> As far as I know there are many companies have used PyFlink for data
>> >> analysis, operation and maintenance monitoring business has been put
>> into
>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According
>> to
>> >> the feedback we received, current documentation is not very friendly to
>> >> PyFlink users. There are two shortcomings:
>> >>
>> >> - Python related content is mixed in the Java/Scala documentation,
>> which
>> >> makes it difficult for users who only focus on PyFlink to read.
>> >> - There is already a "Python Table API" section in the Table API
>> document
>> >> to store PyFlink documents, but the number of articles is small and the
>> >> content is fragmented. It is difficult for beginners to learn from it.
>> >>
>> >> In addition, FLIP-130 introduced the Python DataStream API. Many
>> >> documents will be added for those new APIs. In order to increase the
>> >> readability and maintainability of the PyFlink document, Wei Zhong and
>> me
>> >> have discussed offline and would like to rework it via this FLIP.
>> >>
>> >> We will rework the document around the following three objectives:
>> >>
>> >> - Add a separate section for Python API under the "Application
>> >> Development" section.
>> >> - Restructure current Python documentation to a brand new structure to
>> >> ensure complete content and friendly to beginners.
>> >> - Improve the documents shared by Python/Java/Scala to make it more
>> >> friendly to Python users and without affecting Java/Scala users.
>> >>
>> >> More detail can be found in the FLIP-133:
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
>> >>
>> >> Best,
>> >> Jincheng
>> >>
>> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg
>> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g
>> >>
>> >>
>> >>
>>
>


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-02 Thread Shuiqiang Chen
Hi jincheng,

Thanks for the discussion. +1 for the FLIP.

A well-organized documentation will greatly improve the efficiency and
experience for developers.

Best,
Shuiqiang

Hequn Cheng  于2020年8月1日周六 上午8:42写道:

> Hi Jincheng,
>
> Thanks a lot for raising the discussion. +1 for the FLIP.
>
> I think this will bring big benefits for the PyFlink users. Currently, the
> Python TableAPI document is hidden deeply under the TableAPI tab which
> makes it quite unreadable. Also, the PyFlink documentation is mixed with
> Java/Scala documentation. It is hard for users to have an overview of all
> the PyFlink documents. As more and more functionalities are added into
> PyFlink, I think it's time for us to refactor the document.
>
> Best,
> Hequn
>
>
> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira 
> wrote:
>
>> Hi, Jincheng!
>>
>> Thanks for creating this detailed FLIP, it will make a big difference in
>> the experience of Python developers using Flink. I'm interested in
>> contributing to this work, so I'll reach out to you offline!
>>
>> Also, thanks for sharing some information on the adoption of PyFlink, it's
>> great to see that there are already production users.
>>
>> Marta
>>
>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  wrote:
>>
>> > Hi Jincheng,
>> >
>> > Thanks a lot for bringing up this discussion and the proposal.
>> >
>> > Big +1 for improving the structure of PyFlink doc.
>> >
>> > It will be very friendly to give PyFlink users a unified entrance to
>> learn
>> > PyFlink documents.
>> >
>> > Best,
>> > Xingbo
>> >
>> > Dian Fu  于2020年7月31日周五 上午11:00写道:
>> >
>> >> Hi Jincheng,
>> >>
>> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
>> >> improve the Python API doc.
>> >>
>> >> I have received many feedbacks from PyFlink beginners about
>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
>> mixed
>> >> with the Java doc and it's not easy to find the docs he wants to know.
>> >>
>> >> I think it would greatly improve the user experience if we can have one
>> >> place which includes most knowledges PyFlink users should know.
>> >>
>> >> Regards,
>> >> Dian
>> >>
>> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
>> >>
>> >> Hi folks,
>> >>
>> >> Since the release of Flink 1.11, users of PyFlink have continued to
>> grow.
>> >> As far as I know there are many companies have used PyFlink for data
>> >> analysis, operation and maintenance monitoring business has been put
>> into
>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According
>> to
>> >> the feedback we received, current documentation is not very friendly to
>> >> PyFlink users. There are two shortcomings:
>> >>
>> >> - Python related content is mixed in the Java/Scala documentation,
>> which
>> >> makes it difficult for users who only focus on PyFlink to read.
>> >> - There is already a "Python Table API" section in the Table API
>> document
>> >> to store PyFlink documents, but the number of articles is small and the
>> >> content is fragmented. It is difficult for beginners to learn from it.
>> >>
>> >> In addition, FLIP-130 introduced the Python DataStream API. Many
>> >> documents will be added for those new APIs. In order to increase the
>> >> readability and maintainability of the PyFlink document, Wei Zhong and
>> me
>> >> have discussed offline and would like to rework it via this FLIP.
>> >>
>> >> We will rework the document around the following three objectives:
>> >>
>> >> - Add a separate section for Python API under the "Application
>> >> Development" section.
>> >> - Restructure current Python documentation to a brand new structure to
>> >> ensure complete content and friendly to beginners.
>> >> - Improve the documents shared by Python/Java/Scala to make it more
>> >> friendly to Python users and without affecting Java/Scala users.
>> >>
>> >> More detail can be found in the FLIP-133:
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
>> >>
>> >> Best,
>> >> Jincheng
>> >>
>> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg
>> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g
>> >>
>> >>
>> >>
>>
>


Re: RocksDBKeyedStateBackend如何写磁盘

2020-08-02 Thread jun su
hi congxian,

感谢回复 , 我会再调查下 , 多谢

Congxian Qiu  于2020年8月2日周日 下午2:11写道:

> Hi  jiafu
> RocksDB 刷磁盘是它自己负责的(writebuffer 满了,就会 flush 到磁盘,具体的可以看下 RocksDB 的文档),另外在
> snapshot 的时候会 flush 一次 writebuffer 来保证一致性。
> 你上面截图的是 Flink 中 wrapper 的一个 write batch 操作,这个方法的含义是,积攒一批 操作 再去写
> rocksdb,和 rocksdb 刷磁盘还不是一回事。
> Best,
> Congxian
>
>
> jun su  于2020年7月31日周五 下午4:57写道:
>
> > hi,
> >
> > 看到 RocksDBWriteBatchWrapper类有 flushIfNeeded()方法 , 是这个么?
> >
> >  private void flushIfNeeded() throws RocksDBException {
> > boolean needFlush = batch.count() == capacity || (batchSize > 0 &&
> > getDataSize() >= batchSize);
> > if (needFlush) {
> > flush();
> > }
> > }
> >
> > batchSize 来自 state.backend.rocksdb.write-batch-size 参数的配置
> >
> > jiafu <530496...@qq.com> 于2020年7月31日周五 下午4:41写道:
> >
> > >
> > >
> >
> writerbuffer写满会flush到磁盘,checkpoint启动的时候会有一次snapshot过程,会让rocksdb做checkpoint,然后将数据刷到磁盘形成sst文件。
> > >
> > >
> > >
> > >
> > > --原始邮件--
> > > 发件人:
> > >   "user-zh"
> > > <
> > > sujun891...@gmail.com;
> > > 发送时间:2020年7月31日(星期五) 下午4:37
> > > 收件人:"user-zh" > >
> > > 主题:RocksDBKeyedStateBackend如何写磁盘
> > >
> > >
> > >
> > > hi all,
> > >
> > > 请问RocksDBKeyedStateBackend是何时将state序列化到磁盘的, 窗口结束时间?还是配置的checkpoint周期,谢谢
> > >
> > > --
> > > Best,
> > > Jun Su
> >
> >
> >
> > --
> > Best,
> > Jun Su
> >
>


-- 
Best,
Jun Su


Re: [Flink Unit Tests] Unit test for Flink streaming codes

2020-08-02 Thread David Anderson
Vijay,

There's a section of the docs that describes some strategies for writing
tests of various types, and it includes some Scala examples [1].

There are also some nice examples from Konstantin Knauf in [2], though they
are mostly in Java.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

[2] https://github.com/knaufk/flink-testing-pyramid

Best,
David

On Sun, Aug 2, 2020 at 12:14 PM Arvid Heise  wrote:

> Hi Vijay,
>
> Any unit test of Flink operators is actually an IT case as it involves a
> large portion of the stack. A real unit test, would be over a factored out
> logic class.
>
> Similar to Niels, I'd recommend to use simple sources (env.fromElements)
> and sinks to inject the data and retrieve the data and put the logic under
> test in the middle. That may be a part of your pipeline or even the whole
> pipeline.
>
> If you want to have some scala inspiration, have a look at:
>
> https://github.com/apache/flink/blob/5f0183fe79d10ac36101f60f2589062a39630f96/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala#L56-L82
> . It's on table API but should be quite easy to translate to datastream API
> if needed.
>
> On Sat, Aug 1, 2020 at 4:03 PM Niels Basjes  wrote:
>
>> No, I only have Java.
>>
>> On Fri, 31 Jul 2020, 21:57 Vijayendra Yadav, 
>> wrote:
>>
>>> Thank You Niels. Would you have something for the scala object class.
>>> Say for example if I want to implement a unit test ( not integration test)
>>> for below code or similar  :
>>>
>>>
>>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
>>>
>>> Regards,
>>> Vijay
>>>
>>> On Fri, Jul 31, 2020 at 12:22 PM Niels Basjes  wrote:
>>>
 Does this test in one of my own projects do what you are looking for?


 https://github.com/nielsbasjes/yauaa/blob/1e1ceb85c507134614186e3e60952112a2daabff/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperClass.java#L107


 On Fri, 31 Jul 2020, 20:20 Vijayendra Yadav, 
 wrote:

> Hi Team,
>
> Looking for some help and reference code / material to implement unit
> tests of possible scenarios in Flink *streaming *Code that should
> assert specific cases.
>
> Regards,
> Vijay
>

>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: sporadic "Insufficient no of network buffers" issue

2020-08-02 Thread Rahul Patwari
After debugging more, it seems like this issue is caused by the scheduling
strategy.
Depending on the tasks assigned to the task manager, probably the amount of
memory configured for network buffers is running out.

Through these references: FLINK-12122
, FLINK-15031
, Flink 1.10 release
notes

we
came to know that the scheduling strategy has changed since 1.5.0(FLIP-6)
from 1.4.2 and the change is sort of fixed back in 1.9.2 with support for
providing a configuration for scheduling strategy -
cluster.evenly-spread-out-slots:
true


"Spread out" strategy could definitely help in this case.
can you please confirm our findings and probably suggest some possible ways
to mitigate this issue.

Rahul

On Sat, Aug 1, 2020 at 9:24 PM Rahul Patwari 
wrote:

> From the metrics in Prometheus, we observed that the minimum
> AvailableMemorySegments out of all the task managers is 4.5k when the
> exception was thrown.
> So there were enough network buffers.
> correction to the configs provided above: each TM CPU has 8 cores.
>
> Apart from having fewer network buffers, can something else trigger this
> issue?
> Also, is it expected that the issue is sporadic?
>
> Rahul
>
> On Sat, Aug 1, 2020 at 12:24 PM Ivan Yang  wrote:
>
>> Yes, increase the taskmanager.network.memory.fraction in your case. Also
>> reduce the parallelism will reduce number of network buffer required for
>> your job. I never used 1.4.x, so don’t know about it.
>>
>> Ivan
>>
>> On Jul 31, 2020, at 11:37 PM, Rahul Patwari 
>> wrote:
>>
>> Thanks for your reply, Ivan.
>>
>> I think taskmanager.network.memory.max is by default 1GB.
>> In my case, the network buffers memory is 13112 * 32768 = around 400MB
>> which is 10% of the TM memory as by default
>> taskmanager.network.memory.fraction is 0.1.
>> Do you mean to increase taskmanager.network.memory.fraction?
>>
>>1. If Flink is upgraded from 1.4.2 to 1.8.2 does the application
>>need more network buffers?
>>2. Can this issue happen sporadically? sometimes this issue is not
>>seen when the job manager is restarted.
>>
>> I am thinking whether having fewer network buffers is the root cause (or)
>> if the root cause is something else which triggers this issue.
>>
>> On Sat, Aug 1, 2020 at 9:36 AM Ivan Yang  wrote:
>>
>>> Hi Rahul,
>>>
>>> Try to increase taskmanager.network.memory.max to 1GB, basically double
>>> what you have now. However, you only have 4GB RAM for the entire TM, seems
>>> out of proportion to have 1GB network buffer with 4GB total RAM. Reducing
>>> number of shuffling will require less network buffer. But if your job need
>>> the shuffling, then you may consider to add more memory to TM.
>>>
>>> Thanks,
>>> Ivan
>>>
>>> On Jul 31, 2020, at 2:02 PM, Rahul Patwari 
>>> wrote:
>>>
>>> Hi,
>>>
>>> We are observing "Insufficient number of Network Buffers" issue
>>> Sporadically when Flink is upgraded from 1.4.2 to 1.8.2.
>>> The state of the tasks with this issue translated from DEPLOYING to
>>> FAILED.
>>> Whenever this issue occurs, the job manager restarts. Sometimes, the
>>> issue goes away after the restart.
>>> As we are not getting the issue consistently, we are in a dilemma of
>>> whether to change the memory configurations or not.
>>>
>>> Min recommended No. of Network Buffers: (8 * 8) * 8 * 4 = 2048
>>> The exception says that 13112 no. of network buffers are present, which
>>> is 6x the recommendation.
>>>
>>> Is reducing the no. of shuffles the only way to reduce the no. of
>>> network buffers required?
>>>
>>> Thanks,
>>> Rahul
>>>
>>> configs:
>>> env: Kubernetes
>>> Flink: 1.8.2
>>> using default configs for memory.fraction, memory.min, memory.max.
>>> using 8 TM, 8 slots/TM
>>> Each TM is running with 1 core, 4 GB Memory.
>>>
>>> Exception:
>>> java.io.IOException: Insufficient number of network buffers: required 2,
>>> but only 0 available. The total number of network buffers is currently set
>>> to 13112 of 32768 bytes each. You can increase this number by setting the
>>> configuration keys 'taskmanager.network.memory.fraction',
>>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>>> at
>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:138)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:311)
>>> at
>>> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:271)
>>> at
>>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
>>> at 

??????????: Re: Flink????Kafka??Mysql?? End-To-End Exactly-Once????????????

2020-08-02 Thread ??????
MySQL??Connection??
??Connection A T ??commit()Connection 
A??A TwoPhaseCommitSinkFunction 
pendingCommitTransactions??Connection 
BcommitT


??MySQL??2PC??





----
??: 
   "user-zh"

https://github.com/lusecond/flink_help --depth=1
 gt;
 gt;
 gt; 
TwoPhaseCommitSinkFunction4??beginTransaction??preCommit??commit??abort
 gt; 
jdbc

Re: [Flink Unit Tests] Unit test for Flink streaming codes

2020-08-02 Thread Arvid Heise
Hi Vijay,

Any unit test of Flink operators is actually an IT case as it involves a
large portion of the stack. A real unit test, would be over a factored out
logic class.

Similar to Niels, I'd recommend to use simple sources (env.fromElements)
and sinks to inject the data and retrieve the data and put the logic under
test in the middle. That may be a part of your pipeline or even the whole
pipeline.

If you want to have some scala inspiration, have a look at:
https://github.com/apache/flink/blob/5f0183fe79d10ac36101f60f2589062a39630f96/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala#L56-L82
. It's on table API but should be quite easy to translate to datastream API
if needed.

On Sat, Aug 1, 2020 at 4:03 PM Niels Basjes  wrote:

> No, I only have Java.
>
> On Fri, 31 Jul 2020, 21:57 Vijayendra Yadav, 
> wrote:
>
>> Thank You Niels. Would you have something for the scala object class.
>> Say for example if I want to implement a unit test ( not integration test)
>> for below code or similar  :
>>
>>
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
>>
>> Regards,
>> Vijay
>>
>> On Fri, Jul 31, 2020 at 12:22 PM Niels Basjes  wrote:
>>
>>> Does this test in one of my own projects do what you are looking for?
>>>
>>>
>>> https://github.com/nielsbasjes/yauaa/blob/1e1ceb85c507134614186e3e60952112a2daabff/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperClass.java#L107
>>>
>>>
>>> On Fri, 31 Jul 2020, 20:20 Vijayendra Yadav, 
>>> wrote:
>>>
 Hi Team,

 Looking for some help and reference code / material to implement unit
 tests of possible scenarios in Flink *streaming *Code that should
 assert specific cases.

 Regards,
 Vijay

>>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: RocksDBKeyedStateBackend如何写磁盘

2020-08-02 Thread Congxian Qiu
Hi  jiafu
RocksDB 刷磁盘是它自己负责的(writebuffer 满了,就会 flush 到磁盘,具体的可以看下 RocksDB 的文档),另外在
snapshot 的时候会 flush 一次 writebuffer 来保证一致性。
你上面截图的是 Flink 中 wrapper 的一个 write batch 操作,这个方法的含义是,积攒一批 操作 再去写
rocksdb,和 rocksdb 刷磁盘还不是一回事。
Best,
Congxian


jun su  于2020年7月31日周五 下午4:57写道:

> hi,
>
> 看到 RocksDBWriteBatchWrapper类有 flushIfNeeded()方法 , 是这个么?
>
>  private void flushIfNeeded() throws RocksDBException {
> boolean needFlush = batch.count() == capacity || (batchSize > 0 &&
> getDataSize() >= batchSize);
> if (needFlush) {
> flush();
> }
> }
>
> batchSize 来自 state.backend.rocksdb.write-batch-size 参数的配置
>
> jiafu <530496...@qq.com> 于2020年7月31日周五 下午4:41写道:
>
> >
> >
> writerbuffer写满会flush到磁盘,checkpoint启动的时候会有一次snapshot过程,会让rocksdb做checkpoint,然后将数据刷到磁盘形成sst文件。
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:
> >   "user-zh"
> > <
> > sujun891...@gmail.com;
> > 发送时间:2020年7月31日(星期五) 下午4:37
> > 收件人:"user-zh" >
> > 主题:RocksDBKeyedStateBackend如何写磁盘
> >
> >
> >
> > hi all,
> >
> > 请问RocksDBKeyedStateBackend是何时将state序列化到磁盘的, 窗口结束时间?还是配置的checkpoint周期,谢谢
> >
> > --
> > Best,
> > Jun Su
>
>
>
> --
> Best,
> Jun Su
>