Flink SQL对同一kafka source进行多sink操作时会报javax.management.InstanceAlreadyExistsException异常

2023-06-02 Thread Jeff
sql示例:
create table kafka_source() with ('connector'='kafka');
insert into sink_table1 select * from kafka_source;
insert into sink_table2 select * from kafka_source;




报错内容如下:
javax.management.InstanceAlreadyExistsException: 
kafka.admin.client:type=app-info,id=jxqy_customer_service-enumerator-admin-client
\tat 
java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
\tat 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
\tat 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
\tat 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
\tat 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
\tat 
java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
\tat 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
\tat 
org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500)
\tat 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444)
\tat org.apache.kafka.clients.admin.Admin.create(Admin.java:59)
\tat org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
\tat 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410)
\tat 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151)
\tat 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:209)
\tat 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)
\tat 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
\tat 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
\tat 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
\tat 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
\tat 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
\tat java.base/java.lang.Thread.run(Thread.java:829)

Re:Re: sink mysql id自增表数据会丢失

2023-04-18 Thread Jeff
在sink时指定字段不可以不包括自增主键的列。

















在 2023-04-17 07:29:16,"Shammon FY"  写道:
>Hi
>
>如果想使用mysql的自增主键,应该是在插入的时候不要写自增主键的列吧,可以在insert的时候直接指定需要插入的列试试?
>
>On Sun, Apr 16, 2023 at 7:58 PM Jeff  wrote:
>
>> sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。
>>
>>
>>  mysql内表ddl:
>>
>> create table test (id bigint primary key auto_increment , passport
>> varchar);
>>
>>
>> flink sql:
>> insert into mysql_catalog.test select 0, passport from source_table;
>>
>> 之所以select 0是表示使用物理表的自增值。


udf函数不能使用DataTypeHint("Row>")

2023-04-18 Thread Jeff
在自定义函数中使用DataTypeHint("Row>")时报错,错误内容为:


Caused by: java.lang.ClassCastException: class [I cannot be cast to class 
[Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of 
loader 'bootstrap')
\tat 
org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
\tat 
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
\tat 
org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
\tat 
org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
\tat 
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
\tat StreamExecCalc$251.processElement_split9(Unknown Source)
\tat StreamExecCalc$251.processElement(Unknown Source)
\tat 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)


函数内容如下:
@DataTypeHint("Row>")
 public  Row eval() {
   int[] i = new int[3];
 return Row.of(i);
}


测试其它简单类型时就不会报这个错,所以不是环境问题。

sink mysql id自增表数据会丢失

2023-04-16 Thread Jeff
sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。 


 mysql内表ddl:

create table test (id bigint primary key auto_increment , passport varchar);


flink sql:
insert into mysql_catalog.test select 0, passport from source_table;
 
之所以select 0是表示使用物理表的自增值。

Re:Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 Thread Jeff
这个方法有效,多谢

















在 2023-03-22 17:11:19,"Jane Chan"  写道:
>Hi,
>
>如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
>map ['msg_code','0', 'msg_reason', cast('abc' as string)]
>
>如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/
>
>祝好!
>Jane
>
>On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:
>
>> 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
>> >Hi,
>> >
>> >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
>> >
>> >Sincerely,
>> >Shuo
>> >
>> >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
>> >
>> >> 复制执行我提供的两个sql就一定会复现!
>> >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
>> >> 这个问题是这个版本calcite引起的。
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2023-03-22 09:28:17,"Jeff"  写道:
>> >> >bug地址:
>> >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
>> >> >
>> >> >
>> >> >bug详细内容:
>> >> >the values of map are truncated by the CASE WHEN
>> function.
>> >> >// sql
>> >> >create table test (a map) with ('connector'='print');
>> >> >insert into test  select * from (values(case when true then
>> >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
>> >> end));
>> >> >
>> >> >the result:
>> >> >
>> >> >+I[{test=123}]
>> >> >
>> >> >We hope the value of result is '123456789', but I get '123', the length
>> >> is limited by 'abc'.
>> >>
>>


Re:Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 Thread Jeff
试过了,不兼容,1.27.0都不兼容

















在 2023-03-22 18:04:17,"tison"  写道:
>如果 calcite 层的接口不变,直接替换 jar 包或许也可行?不确定从 1.27 -> 1.29 有没有不兼容的情况。
>
>Best,
>tison.
>
>
>Jane Chan  于2023年3月22日周三 18:11写道:
>
>> Hi,
>>
>> 如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
>> map ['msg_code','0', 'msg_reason', cast('abc' as string)]
>>
>> 如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]
>>
>> [1]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/
>>
>> 祝好!
>> Jane
>>
>> On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:
>>
>> > 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
>> > >Hi,
>> > >
>> > >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
>> > >
>> > >Sincerely,
>> > >Shuo
>> > >
>> > >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
>> > >
>> > >> 复制执行我提供的两个sql就一定会复现!
>> > >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
>> > >> 这个问题是这个版本calcite引起的。
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> 在 2023-03-22 09:28:17,"Jeff"  写道:
>> > >> >bug地址:
>> > >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
>> > >> >
>> > >> >
>> > >> >bug详细内容:
>> > >> >the values of map are truncated by the CASE WHEN
>> > function.
>> > >> >// sql
>> > >> >create table test (a map) with
>> ('connector'='print');
>> > >> >insert into test  select * from (values(case when true then
>> > >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
>> > >> end));
>> > >> >
>> > >> >the result:
>> > >> >
>> > >> >+I[{test=123}]
>> > >> >
>> > >> >We hope the value of result is '123456789', but I get '123', the
>> length
>> > >> is limited by 'abc'.
>> > >>
>> >
>>


Re:Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 Thread Jeff
通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?



















在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
>Hi,
>
>如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
>
>Sincerely,
>Shuo
>
>On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
>
>> 复制执行我提供的两个sql就一定会复现!
>> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
>> 这个问题是这个版本calcite引起的。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-22 09:28:17,"Jeff"  写道:
>> >bug地址:
>> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
>> >
>> >
>> >bug详细内容:
>> >the values of map are truncated by the CASE WHEN function.
>> >// sql
>> >create table test (a map) with ('connector'='print');
>> >insert into test  select * from (values(case when true then
>> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
>> end));
>> >
>> >the result:
>> >
>> >+I[{test=123}]
>> >
>> >We hope the value of result is '123456789', but I get '123', the length
>> is limited by 'abc'.
>>


Re:我上报的一个sql bug没人处理怎么办?

2023-03-21 Thread Jeff
复制执行我提供的两个sql就一定会复现!
不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
这个问题是这个版本calcite引起的。

















在 2023-03-22 09:28:17,"Jeff"  写道:
>bug地址:
>https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
>
>
>bug详细内容:
>the values of map are truncated by the CASE WHEN function.
>// sql
>create table test (a map) with ('connector'='print');
>insert into test  select * from (values(case when true then 
>map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] end));
>
>the result:
>
>+I[{test=123}] 
>
>We hope the value of result is '123456789', but I get '123', the length is 
>limited by 'abc'.


我上报的一个sql bug没人处理怎么办?

2023-03-21 Thread Jeff
bug地址:
https://issues.apache.org/jira/browse/FLINK-31375?filter=-2


bug详细内容:
the values of map are truncated by the CASE WHEN function.
// sql
create table test (a map) with ('connector'='print');
insert into test  select * from (values(case when true then 
map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] end));

the result:

+I[{test=123}] 

We hope the value of result is '123456789', but I get '123', the length is 
limited by 'abc'.

Re:接收Http请求与flink如何建立联系

2022-07-27 Thread Jeff
是想把动态参数传给正在执行的算子?

















在 2022-07-28 08:16:40,"张锴"  写道:
>flink版本1.13.2
>想通过http请求的方式将参数传给flink,这个怎么实现?


hive catalog不支持jdk11

2022-07-26 Thread Jeff
flink 1.15.1在jdk8的时能正常使用hive 
catalog(3.1.3),但升级到jdk11后有版本问题,通过报错上网查询到hive目前不支持jdk11,请问这个有什么解决方案呢?

Re: Community fork of Flink Scala API for Scala 2.12/2.13/3.x

2022-05-12 Thread Jeff Zhang
That's true scala shell is removed from flink . Fortunately, Apache
Zeppelin has its own scala repl for Flink. So if Flink can support scala
2.13, I am wondering whether it is possible to integrate it into scala
shell so that user can run flink scala code in notebook like spark.

On Thu, May 12, 2022 at 11:06 PM Roman Grebennikov  wrote:

> Hi,
>
> AFAIK scala REPL was removed completely in Flink 1.15 (
> https://issues.apache.org/jira/browse/FLINK-24360), so there is nothing
> to cross-build.
>
> Roman Grebennikov | g...@dfdx.me
>
>
> On Thu, May 12, 2022, at 14:55, Jeff Zhang wrote:
>
> Great work Roman, do you think it is possible to run in scala shell as
> well?
>
> On Thu, May 12, 2022 at 10:43 PM Roman Grebennikov  wrote:
>
>
> Hello,
>
> As far as I understand discussions in this mailist, now there is almost no
> people maintaining the official Scala API in Apache Flink. Due to some
> technical complexities it will be probably stuck for a very long time on
> Scala 2.12 (which is not EOL yet, but quite close to):
> * Traversable serializer relies a lot on CanBuildFrom (so it's read and
> compiled on restore), which is missing in Scala 2.13 and 3.x - migrating
> off from this approach maintaining a savepoint compatibility can be quite a
> complex task.
> * Scala API uses an implicitly generated TypeInformation, which is
> generated by a giant scary mkTypeInfo macro, which should be completely
> rewritten for Scala 3.x.
>
> But even in the current state, scala support in Flink has some issues with
> ADT (sealed traits, popular data modelling pattern) not being natively
> supported, so if you use them, you have to fall back to Kryo, which is not
> that fast: we've seed 3x-4x throughput drops in performance tests.
>
> In my current company we made a library (
> https://github.com/findify/flink-adt) which used Magnolia (
> https://github.com/softwaremill/magnolia) to do all the compile-time
> TypeInformation generation to make Scala ADT nice & fast in Flink. With a
> couple of community contributions it was now possible to cross-build it
> also for scala3.
>
> As Flink 1.15 core is scala free, we extracted the DataStream part of
> Flink Scala API into a separate project, glued it together with flink-adt
> and ClosureCleaner from Spark 3.2 (supporting Scala 2.13 and jvm17) and
> cross-compiled it for 2.12/2.13/3.x. You can check out the result on this
> github project: https://github.com/findify/flink-scala-api
>
> So technically speaking, now it's possible to migrate a scala flink job
> from 2.12 to 3.x with:
> * replace flink-streaming-scala dependency with flink-scala-api (optional,
> both libs can co-exist in classpath on 2.12)
> * replace all imports of org.apache.flink.streaming.api.scala._ with ones
> from the new library
> * rebuild the job for 3.x
>
> The main drawback is that there is no savepoint compatibility due to
> CanBuildFrom and different way of handling ADTs. But if you can afford
> re-bootstrapping the state - migration is quite straightforward.
>
> The README on github https://github.com/findify/flink-scala-api#readme
> has some more details on how and why this project was done in this way. And
> the project is a bit experimental, so if you're interested in scala3 on
> Flink, you're welcome to share your feedback and ideas.
>
> with best regards,
> Roman Grebennikov | g...@dfdx.me
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Community fork of Flink Scala API for Scala 2.12/2.13/3.x

2022-05-12 Thread Jeff Zhang
Great work Roman, do you think it is possible to run in scala shell as well?

On Thu, May 12, 2022 at 10:43 PM Roman Grebennikov  wrote:

> Hello,
>
> As far as I understand discussions in this mailist, now there is almost no
> people maintaining the official Scala API in Apache Flink. Due to some
> technical complexities it will be probably stuck for a very long time on
> Scala 2.12 (which is not EOL yet, but quite close to):
> * Traversable serializer relies a lot on CanBuildFrom (so it's read and
> compiled on restore), which is missing in Scala 2.13 and 3.x - migrating
> off from this approach maintaining a savepoint compatibility can be quite a
> complex task.
> * Scala API uses an implicitly generated TypeInformation, which is
> generated by a giant scary mkTypeInfo macro, which should be completely
> rewritten for Scala 3.x.
>
> But even in the current state, scala support in Flink has some issues with
> ADT (sealed traits, popular data modelling pattern) not being natively
> supported, so if you use them, you have to fall back to Kryo, which is not
> that fast: we've seed 3x-4x throughput drops in performance tests.
>
> In my current company we made a library (
> https://github.com/findify/flink-adt) which used Magnolia (
> https://github.com/softwaremill/magnolia) to do all the compile-time
> TypeInformation generation to make Scala ADT nice & fast in Flink. With a
> couple of community contributions it was now possible to cross-build it
> also for scala3.
>
> As Flink 1.15 core is scala free, we extracted the DataStream part of
> Flink Scala API into a separate project, glued it together with flink-adt
> and ClosureCleaner from Spark 3.2 (supporting Scala 2.13 and jvm17) and
> cross-compiled it for 2.12/2.13/3.x. You can check out the result on this
> github project: https://github.com/findify/flink-scala-api
>
> So technically speaking, now it's possible to migrate a scala flink job
> from 2.12 to 3.x with:
> * replace flink-streaming-scala dependency with flink-scala-api (optional,
> both libs can co-exist in classpath on 2.12)
> * replace all imports of org.apache.flink.streaming.api.scala._ with ones
> from the new library
> * rebuild the job for 3.x
>
> The main drawback is that there is no savepoint compatibility due to
> CanBuildFrom and different way of handling ADTs. But if you can afford
> re-bootstrapping the state - migration is quite straightforward.
>
> The README on github https://github.com/findify/flink-scala-api#readme
> has some more details on how and why this project was done in this way. And
> the project is a bit experimental, so if you're interested in scala3 on
> Flink, you're welcome to share your feedback and ideas.
>
> with best regards,
> Roman Grebennikov | g...@dfdx.me
>
>

-- 
Best Regards

Jeff Zhang


Re: Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar to lib folder

2022-05-08 Thread Jeff Zhang
Thanks Yuxia, that works.  Does that mean for one flink distribution, I can
either use java or use scala ? If so, it seems not user friendly.



On Sun, May 8, 2022 at 10:40 AM yuxia  wrote:

> Hi, you can move the flink-table-planner-loader to the /opt.  See more in
> https://issues.apache.org/jira/browse/FLINK-25128
>
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Jeff Zhang" 
> *收件人: *"User" 
> *发送时间: *星期六, 2022年 5 月 07日 下午 10:05:55
> *主题: *Unable to start sql-client when putting
> flink-table-planner_2.12-1.15.0.jar to lib folder
>
> Hi folks,
> It looks like flink 1.15 changes its binary distribution because of scala
> free. The flink-table-planner_2.12-1.15.0.jar is put under the opt folder.
> Now I would like to use it for my scala flink app, so I move it to the lib
> folder, but after that, I can not start sql-client. Is it expected ? Here's
> the error I see
>
>
> -
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> Caused by: org.apache.flink.table.api.TableException: Could not
> instantiate the executor. Make sure a planner module is on the classpath
> at
> org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:163)
> at
> org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:111)
> at
> org.apache.flink.table.client.gateway.context.ExecutionContext.(ExecutionContext.java:66)
> at
> org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:247)
> at
> org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
> at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
> ... 1 more
> Caused by: org.apache.flink.table.api.ValidationException: Multiple
> factories for identifier 'default' that implement
> 'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath.
>
> Ambiguous factory classes are:
>
> org.apache.flink.table.planner.delegation.DefaultExecutorFactory
> org.apache.flink.table.planner.loader.DelegateExecutorFactory
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:553)
> at
> org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:154)
> ... 8 more
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>

-- 
Best Regards

Jeff Zhang


Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar to lib folder

2022-05-07 Thread Jeff Zhang
Hi folks,

It looks like flink 1.15 changes its binary distribution because of scala
free. The flink-table-planner_2.12-1.15.0.jar is put under the opt folder.
Now I would like to use it for my scala flink app, so I move it to the lib
folder, but after that, I can not start sql-client. Is it expected ? Here's
the error I see

-
Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected exception.
This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.table.api.TableException: Could not instantiate
the executor. Make sure a planner module is on the classpath
at
org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:163)
at
org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:111)
at
org.apache.flink.table.client.gateway.context.ExecutionContext.(ExecutionContext.java:66)
at
org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:247)
at
org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
... 1 more
Caused by: org.apache.flink.table.api.ValidationException: Multiple
factories for identifier 'default' that implement
'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath.

Ambiguous factory classes are:

org.apache.flink.table.planner.delegation.DefaultExecutorFactory
org.apache.flink.table.planner.loader.DelegateExecutorFactory
at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:553)
at
org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:154)
... 8 more



-- 
Best Regards

Jeff Zhang


Re: scala shell not part of 1.14.4 download

2022-03-18 Thread Jeff Zhang
Hi Georg,

You can try Zeppelin 0.10.1 which supports scala 2.12 for
Flink interpreter. Internally, Flink interpreter of Zeppelin use scala
shell, you can write scala code and run it in an interactive way.

https://zeppelin.apache.org/download.html
https://zeppelin.apache.org/docs/0.10.1/interpreter/flink.html
https://medium.com/analytics-vidhya/learn-flink-sql-the-easy-way-d9d48a95ae57


On Fri, Mar 18, 2022 at 8:10 PM Martijn Visser 
wrote:

> Hi Georg,
>
> As far as I know, there has never been a Scala Shell for Scala 2.12
> because it was not supported, only for Scala 2.11. The Scala Shell also
> also been completely dropped with Flink 1.15.
>
> Best regards,
>
> Martijn
>
> On Fri, 18 Mar 2022 at 12:43, Georg Heiler 
> wrote:
>
>> Hi,
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/repls/scala_shell/
>> mentions:
>>
>> bin/start-scala-shell.sh local
>>
>> a script to start a scala REPL shell.
>>
>> But the download for Flink 
>> https://www.apache.org/dyn/closer.lua/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
>>
>> Does not seem to include this script anymore.
>>
>> Am I missing something?
>>
>> How can I still start a scala repl?
>>
>> Best,
>>
>> Georg
>>
>>

-- 
Best Regards

Jeff Zhang


flink sql 如何提高下游并发度?

2022-01-09 Thread Jeff
当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?

Re:Re:sink jdbc超时问题(无心跳包)

2021-12-19 Thread Jeff



会有? 什么意思呢? 我现在用的是1.13.2没有呀,相关配置也没。











在 2021-12-20 10:43:05,"Michael Ran"  写道:
>会有判断连接是否有效,以及重试的操作
>在 2021-12-20 11:39:23,"Jeff"  写道:
>>sink jdbc长时间未有数据写入后,再写数据时会出现连接超时问题,请问flink jdbc connector是否有像连接池那样的心跳功能?


sink jdbc超时问题(无心跳包)

2021-12-19 Thread Jeff
sink jdbc长时间未有数据写入后,再写数据时会出现连接超时问题,请问flink jdbc connector是否有像连接池那样的心跳功能?

Re:flink on native k8s模式下CPU使用率不高问题

2021-12-19 Thread Jeff
升级版本没有用的,我用的是flink 
1.13.2也遇到这个问题,原因是它request与limit相同,所以后来我改了它的源代码,你可以参考一下:https://github.com/jeff-zou/flink.git
  ,我主要是改了KubernetesUtils.java这个类,利用external resource传入参数来替换request

















在 2021-12-18 09:15:06,"casel.chen"  写道:
>所用flink版本是1.12.5,部署作业到native k8s设置的不管是 kubernetes.taskmanager.cpu 还是 
>kubernetes.jobmanager.cpu 最终在k8s yaml文件中显示的cpu 
>request和limit都是一样的。这会导致作业的CPU使用率很低,因为我们发现作业刚启动的时候所需要的CPU资源要远远高于作业实际运行起来的CPU资源,二者可能相差近5倍左右。如果设置的cpu较低的话,作业启动需要花费很长时间。
>如何才能够提高作业CPU使用率呢?可以直接修改k8s 
>yaml文件将request设置得高一些,而limit设置低一些吗?还有更好的办法吗?升级Flink版本有望解决么?


Re:Re:Re:Re:flink本地编译卡住

2021-12-11 Thread Jeff
好的,我试一下

















在 2021-12-11 16:32:01,"Yuepeng Pan"  写道:
>Hi, Jeff.
>
>
>  根据你提示的部分原信息:
>  [1].可以先尝试核对一下maven版本,这个页面中有一些关于maven构建flink的一些注意事项: 
> https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/flinkDev/building.html
>  [2].另外可以构建命令之前尝试添加maven 的 clean, 或者删除卡顿模块生成目标jar包在本地仓库的目录位置
>  [3].确认下是不是mvn内存过小导致jvm卡顿, 如果是这个问题,可以适度根据机器资源更改参数配置
>  [4].如果还是不能确定问题,可以尝试一下 -X -e 启动 maven 的debug级别信息以获得更多构建时候的输出细节
>  以上,仅供参考。
>
>
>祝好,
>Yuepeng Pan.
>At 2021-12-11 16:53:44, "Jeff"  wrote:
>>
>>
>>
>>
>>
>>
>>able-runtime-blink\target\dependency-reduced-pom.xml
>>
>>[INFO]
>>
>>[INFO] --- maven-surefire-plugin:2.22.1:test (integration-tests) @ 
>>flink-table-r
>>
>>untime-blink_2.12 ---
>>
>>[INFO] Tests are skipped.
>>
>>[INFO]
>>
>>[INFO] --- maven-install-plugin:2.5.2:install (default-install) @ 
>>flink-table-ru
>>
>>ntime-blink_2.12 ---
>>
>>[INFO] Installing 
>>G:\git\flink\flink\flink-table\flink-table-runtime-blink\targe
>>
>>t\flink-table-runtime-blink_2.12-1.13-SNAPSHOT.jar to 
>>G:\Java\maven\repository\o
>>
>>rg\apache\flink\flink-table-runtime-blink_2.12\1.13-SNAPSHOT\flink-table-runtime
>>
>>-blink_2.12-1.13-SNAPSHOT.jar
>>
>>[INFO] Installing 
>>G:\git\flink\flink\flink-table\flink-table-runtime-blink\targe
>>
>>t\dependency-reduced-pom.xml to 
>>G:\Java\maven\repository\org\apache\flink\flink-
>>
>>table-runtime-blink_2.12\1.13-SNAPSHOT\flink-table-runtime-blink_2.12-1.13-SNAPS
>>
>>HOT.pom
>>
>>[INFO] Installing 
>>G:\git\flink\flink\flink-table\flink-table-runtime-blink\targe
>>
>>t\flink-table-runtime-blink_2.12-1.13-SNAPSHOT-tests.jar to 
>>G:\Java\maven\reposi
>>
>>tory\org\apache\flink\flink-table-runtime-blink_2.12\1.13-SNAPSHOT\flink-table-r
>>
>>untime-blink_2.12-1.13-SNAPSHOT-tests.jar
>>
>>[INFO] Installing 
>>G:\git\flink\flink\flink-table\flink-table-runtime-blink\targe
>>
>>t\flink-table-runtime-blink_2.12-1.13-SNAPSHOT-tests.jar to 
>>G:\Java\maven\reposi
>>
>>tory\org\apache\flink\flink-table-runtime-blink_2.12\1.13-SNAPSHOT\flink-table-r
>>
>>untime-blink_2.12-1.13-SNAPSHOT-tests.jar
>>
>>
>>
>>
>>就是卡在这里,没有其它信息了。是不是它在后台编译nodejs前端代码码? 但等了一天多也没有反应。
>>
>>
>>
>>
>>在 2021-12-11 15:47:46,"Yuepeng Pan"  写道:
>>
>>图片挂掉了。
>>可以外链到图床或者直接贴一下关键的原始提示信息。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2021-12-11 16:34:41,"Jeff"  写道:
>>
>>
>>
>>
>>根据官方建议的maven打包命令: mvm install -Dfast -DskipTests -Dscalla-2.12 -T 1C 
>>,但我在本地编译打包总是卡在flink-table-runtim-blink这里,也没有错误提示,如下图:
>>请问有什么处理方法么?
>>
>>
>>
>>
>>
>>
>> 
>>
>>
>>
>>
>>
>> 


Re:Re:flink本地编译卡住

2021-12-11 Thread Jeff






able-runtime-blink\target\dependency-reduced-pom.xml

[INFO]

[INFO] --- maven-surefire-plugin:2.22.1:test (integration-tests) @ flink-table-r

untime-blink_2.12 ---

[INFO] Tests are skipped.

[INFO]

[INFO] --- maven-install-plugin:2.5.2:install (default-install) @ flink-table-ru

ntime-blink_2.12 ---

[INFO] Installing G:\git\flink\flink\flink-table\flink-table-runtime-blink\targe

t\flink-table-runtime-blink_2.12-1.13-SNAPSHOT.jar to G:\Java\maven\repository\o

rg\apache\flink\flink-table-runtime-blink_2.12\1.13-SNAPSHOT\flink-table-runtime

-blink_2.12-1.13-SNAPSHOT.jar

[INFO] Installing G:\git\flink\flink\flink-table\flink-table-runtime-blink\targe

t\dependency-reduced-pom.xml to G:\Java\maven\repository\org\apache\flink\flink-

table-runtime-blink_2.12\1.13-SNAPSHOT\flink-table-runtime-blink_2.12-1.13-SNAPS

HOT.pom

[INFO] Installing G:\git\flink\flink\flink-table\flink-table-runtime-blink\targe

t\flink-table-runtime-blink_2.12-1.13-SNAPSHOT-tests.jar to G:\Java\maven\reposi

tory\org\apache\flink\flink-table-runtime-blink_2.12\1.13-SNAPSHOT\flink-table-r

untime-blink_2.12-1.13-SNAPSHOT-tests.jar

[INFO] Installing G:\git\flink\flink\flink-table\flink-table-runtime-blink\targe

t\flink-table-runtime-blink_2.12-1.13-SNAPSHOT-tests.jar to G:\Java\maven\reposi

tory\org\apache\flink\flink-table-runtime-blink_2.12\1.13-SNAPSHOT\flink-table-r

untime-blink_2.12-1.13-SNAPSHOT-tests.jar




就是卡在这里,没有其它信息了。是不是它在后台编译nodejs前端代码码? 但等了一天多也没有反应。




在 2021-12-11 15:47:46,"Yuepeng Pan"  写道:

图片挂掉了。
可以外链到图床或者直接贴一下关键的原始提示信息。










在 2021-12-11 16:34:41,"Jeff"  写道:




根据官方建议的maven打包命令: mvm install -Dfast -DskipTests -Dscalla-2.12 -T 1C 
,但我在本地编译打包总是卡在flink-table-runtim-blink这里,也没有错误提示,如下图:
请问有什么处理方法么?






 





 

flink本地编译卡住

2021-12-11 Thread Jeff



根据官方建议的maven打包命令: mvm install -Dfast -DskipTests -Dscalla-2.12 -T 1C 
,但我在本地编译打包总是卡在flink-table-runtim-blink这里,也没有错误提示,如下图:
请问有什么处理方法么?



flink本地编译卡住

2021-12-10 Thread Jeff
根据官方建议的maven打包命令: mvm install -Dfast -DskipTests -Dscalla-2.12 -T 1C 
,但我在本地编译打包总是卡在flink-table-runtim-blink这里,也没有错误提示,如下图:
请问有什么处理方法么?




 

Re: How to solve the target:jvm-1.8 error when run start-scala-shell.sh

2021-10-30 Thread Jeff Zhang
The reason is that flink-scala-shell uses scala-2.11 which uses jvm-1.6 as
its target by default, that's why it can not use any library that depends
on jvm-1.8.

You can use Zeppelin instead which supports scala-shell of scala-2.12, I
have verified that it works in Zeppelin when you use scala-2.12.

Check these links for more details.

https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html
https://medium.com/analytics-vidhya/learn-flink-sql-the-easy-way-d9d48a95ae57



Jing Lu  于2021年10月30日周六 上午2:03写道:

> Hi Flink users,
>
> I am working on testing my code in start-scala-shell.sh. The flink version
> is: flink-1.12.0-bin-scala_2.11.tgz. I put the jar file (
> https://mvnrepository.com/artifact/software.amazon.awssdk/sts/2.15.65) in
> the lib folder.
>
> Then run the start-scala-shell.sh REPL:
>
> scala> import software.amazon.awssdk.services.sts.StsAsyncClient
> import software.amazon.awssdk.services.sts.StsAsyncClient
>
> scala> StsAsyncClient.builder
> :72: error: Static methods in interface require -target:jvm-1.8
>StsAsyncClient.builder
>
> Why do I have this error? Is there any way to solve this problem?
>
>
> Thanks,
> Jing
>
>

-- 
Best Regards

Jeff Zhang


Re: modify the classloader of JM dynamically to handle "ADD JAR hdfs://" statement

2021-10-16 Thread Jeff Zhang
Hi vtygoss,

Have you taken a look at Zeppelin ? Zeppelin support Flink UDF in several
different approaches, and as you said, it is not a good practice to put udf
jars under $FLINK_HOME/lib. In Zeppelin you don't do that, you can
dynamically load udf jars. Check the following doc for more details.

https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html#flink-udf



vtygoss  于2021年10月16日周六 下午4:54写道:

> Hi, community!
>
>
> I am working on building a stream processing platform using Flink 1.12.0.
> I met a problem in the scenario of SQL Application migration from
> SparkSQL/HiveSQL to FlinkSQL.
>
>
> How to dynamically modify the classloader of the JobManager already
> launched to handle "ADD JAR HDFS://..." statement in sql text?
>
>
> I already found the way to modify the classloader of all TaskManager
> dynamically through "Catalog#getHiveConf#setAuxJar" or
> "pipeline.classpath"; But i can't find the way to modify the classloader of
> JobManager dynamically, then the Application will fail because of UDF
> ClassNotFoundException. And i don't want put the udf-jar into
> $FLINK_HOME/lib, that's too heavy.
>
>
> Thanks for your any suggestions or replies!
>
>
> Best Regards!
>
>
>
>

-- 
Best Regards

Jeff Zhang


【Announce】Zeppelin 0.10.0 is released, Flink on Zeppelin Improved

2021-08-25 Thread Jeff Zhang
Hi Flink users,

We (Zeppelin community) are very excited to announce Zeppelin 0.10.0 is
officially released. In this version, we made several improvements on Flink
interpreter.  Here's the main features of Flink on Zeppelin:

   - Support multiple versions of Flink
   - Support multiple versions of Scala
   - Support multiple languages
   - Support multiple execution modes
   - Support Hive
   - Interactive development
   - Enhancement on Flink SQL
   - Multi-tenancy
   - Rest API Support

Take a look at this document for more details:
https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html
The quickest way to try Flink on Zeppelin is via its docker image
https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html#play-flink-in-zeppelin-docker

Besides these, here’s one blog about how to run Flink sql cookbook on
Zeppelin,
https://medium.com/analytics-vidhya/learn-flink-sql-the-easy-way-d9d48a95ae57
The easy way to learn Flink Sql.

Hope it would be helpful for you and welcome to join our community to
discuss with others. http://zeppelin.apache.org/community.html


-- 
Best Regards

Jeff Zhang


【Announce】Zeppelin 0.10.0 is released, Flink on Zeppelin Improved

2021-08-25 Thread Jeff Zhang
Hi Flink users,

We (Zeppelin community) are very excited to announce Zeppelin 0.10.0 is
officially released. In this version, we made several improvements on Flink
interpreter.  Here's the main features of Flink on Zeppelin:

   - Support multiple versions of Flink
   - Support multiple versions of Scala
   - Support multiple languages
   - Support multiple execution modes
   - Support Hive
   - Interactive development
   - Enhancement on Flink SQL
   - Multi-tenancy
   - Rest API Support

Take a look at this document for more details:
https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html
The quickest way to try Flink on Zeppelin is via its docker image
https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html#play-flink-in-zeppelin-docker

Besides these, here’s one blog about how to run Flink sql cookbook on
Zeppelin,
https://medium.com/analytics-vidhya/learn-flink-sql-the-easy-way-d9d48a95ae57
The easy way to learn Flink Sql.

Hope it would be helpful for you and welcome to join our community to
discuss with others. http://zeppelin.apache.org/community.html


-- 
Best Regards

Jeff Zhang


Re: Flink使用SQL注册UDF未来有规划吗

2021-08-16 Thread Jeff Zhang
https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2

Ada Luna  于2021年8月16日周一 下午2:26写道:

> 目前注册UDF要通过Table API。
> 未来会通过SQL直接将UDF注册到上下文中吗?
>


-- 
Best Regards

Jeff Zhang


Re: Obtain JobManager Web Interface URL

2021-08-03 Thread Jeff Zhang
gt;> > > =Cu-w4-hIu8MGtvnq2Ob8StpWCZhbFmwN4knnt35NqOM=
>> > >
>> > > Best,
>> > > Yangze Guo
>> > >
>> > > On Fri, Jul 30, 2021 at 1:41 AM Hailu, Andreas [Engineering] <
>> andreas.ha...@gs.com> wrote:
>> > > >
>> > > > Hi team,
>> > > >
>> > > >
>> > > >
>> > > > Is there a method available to obtain the JobManager’s REST url? We
>> originally overloaded CliFrontend#executeProgram and nabbed it from the
>> ClusterClient#getWebInterfaceUrl method, but it seems this method’s
>> signature has been changed and no longer available as of 1.10.0.
>> > > >
>> > > >
>> > > >
>> > > > Best,
>> > > >
>> > > > Andreas
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > 
>> > > >
>> > > > Your Personal Data: We may collect and process information about you
>> > > > that may be subject to data protection laws. For more information
>> > > > about how we use and disclose your personal data, how we protect
>> > > > your information, our legal basis to use your information, your
>> > > > rights and who you can contact, please refer to:
>> > > > http://www.gs.com/privacy-notices
>> > >
>> > > 
>> > >
>> > > Your Personal Data: We may collect and process information about you
>> > > that may be subject to data protection laws. For more information
>> > > about how we use and disclose your personal data, how we protect your
>> > > information, our legal basis to use your information, your rights and
>> > > who you can contact, please refer to:
>> > > www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>
>> >
>> > 
>> >
>> > Your Personal Data: We may collect and process information about you
>> that may be subject to data protection laws. For more information about how
>> we use and disclose your personal data, how we protect your information,
>> our legal basis to use your information, your rights and who you can
>> contact, please refer to: www.gs.com/privacy-notices<
>> http://www.gs.com/privacy-notices>
>>
>

-- 
Best Regards

Jeff Zhang


Re: flink sql 依赖隔离

2021-07-22 Thread Jeff Zhang
Zeppelin 支持依赖的动态加载
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2


Michael Ran  于2021年7月22日周四 下午8:07写道:

> 通过任务进行隔离引用呗。你们美团已经是k8s了吧?
> 在 2021-07-05 14:06:53,"silence"  写道:
> >请教大家目前flink sql有没有办法做到依赖隔离
> >比如connector,format,udf(这个最重要)等,
> >很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
> >目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
>


-- 
Best Regards

Jeff Zhang


Re: Registering custom metrics on the JobManager

2021-07-15 Thread Jeff Charles
We would like a counter of exceptions so we can alert if there's an
anomalous increase in them. I realize a counter in the JobManager would not
capture anywhere close to all exceptions but even capturing a count of a
subset that we're able to track would be helpful.

On Thu, Jul 15, 2021 at 3:47 PM Chesnay Schepler  wrote:

> This is currently not possible. What metric are you interested in?
>
> On 15/07/2021 21:16, Jeff Charles wrote:
> > Hi,
> >
> > I would like to register a custom metric on the JobManager as opposed
> > to a TaskManager. I cannot seem to locate any documentation that
> > indicates how to do this or even if it's currently possible or not.
> >
> > Does anyone have any guidance on how to register a custom metric on
> > the JobManager?
> >
> > Jeff Charles
>
>
>


Registering custom metrics on the JobManager

2021-07-15 Thread Jeff Charles
Hi,

I would like to register a custom metric on the JobManager as opposed to a
TaskManager. I cannot seem to locate any documentation that indicates how
to do this or even if it's currently possible or not.

Does anyone have any guidance on how to register a custom metric on the
JobManager?

Jeff Charles


Re: Re: flink sql平台多版本支持问题

2021-06-12 Thread Jeff Zhang
如果不是native k8s的话,现在已经支持了,用remote模式就可以,
https://www.yuque.com/jeffzhangjianfeng/gldg8w/engh3w
native k8s的话,社区正在做,这是PR: https://github.com/apache/zeppelin/pull/4116


casel.chen  于2021年6月13日周日 上午9:39写道:

> 嗯,flink on zeppelin也是一个不错的选择,只是因为现在flink on
> zeppelin还不支持作业运行在kubernetes上,所以暂时还无法直接使用,未来支持后可以考虑引入。
> 谢谢大佬给的建议。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-06-13 07:21:46,"Jeff Zhang"  写道:
> >另外一个选择是flink on zeppelin,可以调用flink on zeppelin的rest api,把zeppelin当做是flink
> >job server, zeppelin天然支持flink 1.10之后的所有版本。钉钉群:32803524
> >
> >casel.chen  于2021年6月12日周六 下午5:56写道:
> >
> >> 需求背景:
> >> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink
> >> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink
> >> SQL作业采用的是1.13开发的。
> >>
> >>
> >> 而让平台支持不同Flink版本,我能想到有三种实现方案:
> >>
> >>
> >> 1. 平台直接调用 flink run 或 flink run-application 提交作业
> >> 优点:实现简单,每个flink版本都会带这个shell脚本
> >>
> >>
> 缺点:受限于脚本提供的功能,像语法检查、格式化sql、获取sql执行计划及sql结构和血缘这些“高级”功能没有在官方脚本中提供(能否完善下呢?),如果要使用的话得自己扩展实现(同样需要自己维护多版本)
> >>
> >>
> >> 2. 平台调用 flink sql gateway 暴露的 restful api 提交作业
> >> 优点:实现简单,通过gateway屏蔽上层应用调用多版本flink api问题
> >> 缺点:需要额外扩展开发和部署gateway,目前暂不清楚社区对gateway的定位是什么?只是配合flink jdbc
> >> driver使用么?这两项目目前活跃度都不高
> >>
> >>
> >> 3. 需要维护多个平台版本,像 flink-sql-connector-hive 支持多版本hive那样
> >> 优点:现有平台就是基于特定版本flink api开发的,改动小,需要抽取公用接口
> >> 缺点:需要“复制”平台多份实现以支持不同flink版本,开发和维护量比较大
> >>
> >>
> >> 目前,我暂时prefer方案2,可能不对,也可能还有更优的我没想到,欢迎各位大佬们给点建议。
> >>
> >>
> >
> >--
> >Best Regards
> >
> >Jeff Zhang
>


-- 
Best Regards

Jeff Zhang


Re: flink sql平台多版本支持问题

2021-06-12 Thread Jeff Zhang
另外一个选择是flink on zeppelin,可以调用flink on zeppelin的rest api,把zeppelin当做是flink
job server, zeppelin天然支持flink 1.10之后的所有版本。钉钉群:32803524

casel.chen  于2021年6月12日周六 下午5:56写道:

> 需求背景:
> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink
> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink
> SQL作业采用的是1.13开发的。
>
>
> 而让平台支持不同Flink版本,我能想到有三种实现方案:
>
>
> 1. 平台直接调用 flink run 或 flink run-application 提交作业
> 优点:实现简单,每个flink版本都会带这个shell脚本
>
> 缺点:受限于脚本提供的功能,像语法检查、格式化sql、获取sql执行计划及sql结构和血缘这些“高级”功能没有在官方脚本中提供(能否完善下呢?),如果要使用的话得自己扩展实现(同样需要自己维护多版本)
>
>
> 2. 平台调用 flink sql gateway 暴露的 restful api 提交作业
> 优点:实现简单,通过gateway屏蔽上层应用调用多版本flink api问题
> 缺点:需要额外扩展开发和部署gateway,目前暂不清楚社区对gateway的定位是什么?只是配合flink jdbc
> driver使用么?这两项目目前活跃度都不高
>
>
> 3. 需要维护多个平台版本,像 flink-sql-connector-hive 支持多版本hive那样
> 优点:现有平台就是基于特定版本flink api开发的,改动小,需要抽取公用接口
> 缺点:需要“复制”平台多份实现以支持不同flink版本,开发和维护量比较大
>
>
> 目前,我暂时prefer方案2,可能不对,也可能还有更优的我没想到,欢迎各位大佬们给点建议。
>
>

-- 
Best Regards

Jeff Zhang


Re: Jupyter PyFlink Web UI

2021-06-09 Thread Jeff Zhang
BTW, you can also send email to zeppelin user maillist to join zeppelin
slack channel to discuss more details.
http://zeppelin.apache.org/community.html


Jeff Zhang  于2021年6月9日周三 下午6:34写道:

> Hi Maciek,
>
> You can try zeppelin which support pyflink and display flink job url
> inline.
>
> http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html
>
>
> Maciej Bryński  于2021年6月9日周三 下午1:53写道:
>
>> Nope.
>> I found the following solution.
>>
>> conf = Configuration()
>> env =
>> StreamExecutionEnvironment(get_gateway().jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf._j_configuration))
>> env_settings =
>> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>> table_env =
>> StreamTableEnvironment.create(stream_execution_environment=env,
>> environment_settings=env_settings)
>>
>> I also created the bug report
>> https://issues.apache.org/jira/browse/FLINK-22924.
>> I think this API should be exposed in Python.
>>
>> śr., 9 cze 2021 o 04:57 Dian Fu  napisał(a):
>> >
>> > Hi Macike,
>> >
>> > You could try if the following works:
>> >
>> > ```
>> > table_env.get_config().get_configuration().set_string("rest.bind-port",
>> "xxx")
>> > ```
>> >
>> > Regards,
>> > Dian
>> >
>> > > 2021年6月8日 下午8:26,maverick  写道:
>> > >
>> > > Hi,
>> > > I've got a question. I'm running PyFlink code from Jupyter Notebook
>> starting
>> > > TableEnvironment with following code:
>> > >
>> > > env_settings =
>> > >
>> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>> > > table_env = TableEnvironment.create(env_settings)
>> > >
>> > > How can I enable Web UI in this code?
>> > >
>> > > Regards,
>> > > Maciek
>> > >
>> > >
>> > >
>> > > --
>> > > Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> >
>>
>>
>> --
>> Maciek Bryński
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


-- 
Best Regards

Jeff Zhang


Re: Jupyter PyFlink Web UI

2021-06-09 Thread Jeff Zhang
Hi Maciek,

You can try zeppelin which support pyflink and display flink job url inline.

http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html


Maciej Bryński  于2021年6月9日周三 下午1:53写道:

> Nope.
> I found the following solution.
>
> conf = Configuration()
> env =
> StreamExecutionEnvironment(get_gateway().jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf._j_configuration))
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(stream_execution_environment=env,
> environment_settings=env_settings)
>
> I also created the bug report
> https://issues.apache.org/jira/browse/FLINK-22924.
> I think this API should be exposed in Python.
>
> śr., 9 cze 2021 o 04:57 Dian Fu  napisał(a):
> >
> > Hi Macike,
> >
> > You could try if the following works:
> >
> > ```
> > table_env.get_config().get_configuration().set_string("rest.bind-port",
> "xxx")
> > ```
> >
> > Regards,
> > Dian
> >
> > > 2021年6月8日 下午8:26,maverick  写道:
> > >
> > > Hi,
> > > I've got a question. I'm running PyFlink code from Jupyter Notebook
> starting
> > > TableEnvironment with following code:
> > >
> > > env_settings =
> > >
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> > > table_env = TableEnvironment.create(env_settings)
> > >
> > > How can I enable Web UI in this code?
> > >
> > > Regards,
> > > Maciek
> > >
> > >
> > >
> > > --
> > > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
>
>
> --
> Maciek Bryński
>


-- 
Best Regards

Jeff Zhang


Re: No result shown when submitting the SQL in cli

2021-05-11 Thread Jeff Zhang
The result is printed in TM.
It is local mode in IDE, so TM runs in your local jvm that's why you see
the result
While it is distributed mode (either yarn or standalone mode) when you are
in sql-client, you should be able to see the result in TM logs.


tao xiao  于2021年5月11日周二 下午11:40写道:

> Does anyone help with this question?
>
> On Thu, May 6, 2021 at 3:26 PM tao xiao  wrote:
>
>> Hi team,
>>
>> I wrote a simple SQL job to select data from Kafka. I can see results
>> printing out in IDE but when I submit the job to a standalone cluster in
>> CLI there is no result shown. I am sure the job is running well in the
>> cluster with debug log suggesting that the kafka consumer is fetching data
>> from Kafka. I enabled debug log in CLI and I don't see any obvious log.
>> Here is the job code snippet
>>
>> public static void main(String[] args) throws Exception {
>>   StreamTableEnvironment tableEnv = StreamTableEnvironment
>>   
>> .create(StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1));
>>
>>   String sqls = new String(Files.readAllBytes(Paths.get(args[0])));
>>   splitIgnoreQuota(sqls, ';').forEach(sql -> {
>> TableResult tableResult = tableEnv.executeSql(sql);
>> tableResult.print();
>>   });
>> }
>>
>> It simply parses a sql file and execute the statements
>>
>> Here is the SQL statements
>>
>> CREATE TABLE t1 (
>>   `f1` STRING,
>>   `f2` STRING
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'topic',
>>   'properties.group.id' = 'test1',
>>   'properties.max.partition.fetch.bytes' = '16384',
>>   'properties.enable.auto.commit' = 'false',
>>   'properties.bootstrap.servers' = 'kafka:9092',
>>   'scan.startup.mode' = 'earliest-offset',
>>   'format' = 'json'
>> );
>>
>> SELECT * FROM t1
>>
>>
>> Below is the result I got from IDE
>> | +I | b8f5 |   abcd |
>> | +I | b8f5 |   abcd |
>>
>> And this is the result from CLI
>> bin/flink run  -m localhost:8081 -c kafka.sample.flink.SQLSample
>> ~/workspaces/kafka-sample/target/kafka-sample-0.1.2-jar-with-dependencies.jar
>> /sample.sql
>> ++
>> | result |
>> ++
>> | OK |
>> ++
>> 1 row in set
>> Job has been submitted with JobID ace45d2ff850675243e2663d3bf11701
>> ++++
>> | op |   uuid |ots |
>> ++++
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>


-- 
Best Regards

Jeff Zhang


Re: 提交flink-sql 出现无法部署到yarn集群

2021-04-14 Thread Jeff Zhang
看 yarn app log

张锴  于2021年4月14日周三 下午8:00写道:

> 在用flink-sql的方式连接hive时 ,出现以下错误:
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Could not deploy Yarn job cluster.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1688)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not deploy Yarn job cluster.
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)
> at
>
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)
> at
>
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at
>
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765)
> at com.erwan.flinksql.FlinkConnectHive.main(FlinkConnectHive.java:49)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> ... 11 more
> Caused by:
> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
> YARN application unexpectedly switched to state FAILED during deployment.
> Diagnostics from YARN: Application application_1618298202025_0017 failed 1
> times (global limit =2; local limit is =1) due to AM Container for
> appattempt_1618298202025_0017_01 exited with  exitCode: 2
> Failing this attempt.Diagnostics: [2021-04-14 19:04:02.506]Exception from
> container-launch.
> Container id: container_e13_1618298202025_0017_01_01
> Exit code: 2。
>
> 由于错误原因不明显,不好排查,也不确定是到底是哪里的问题,请问有什么办法能够定位问题。
>


-- 
Best Regards

Jeff Zhang


Re: Re: flink sql如何从远程加载jar包中的udf

2021-03-11 Thread Jeff Zhang
Zeppelin 支持加载UDF jar的,可以参考下面的代码,不过架构上可能与你们的原有架构会有所差别

https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2#8iONE
https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala#L469


chenxyz  于2021年3月12日周五 上午9:42写道:

> 目前这种方法不可行,在公司的平台化系统里提交flink任务,自己能掌控的只有代码这块。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-03-11 16:39:24,"silence"  写道:
> >启动时通过-C加到classpath里试试
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best Regards

Jeff Zhang


flink sql 并发数问题

2021-02-24 Thread Jeff
hi all,


用flink sql消费kafka数据,有效并发数是由kafka分区数来决定的,请问有什么方法提高有效并发数吗? 因为有一个UDF是请求python 
http服务,速度不快,有没有方法单独提高这一块的并发数呢?  

Re: flink-sql-gateway稳定性如何,可以在生产环境使用吗?

2021-01-26 Thread Jeff Zhang
zeppelin 有 rest api 接口,https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh

jinsx  于2021年1月27日周三 下午2:30写道:

> 如果使用zeppelin,zeppelin可以提供rpc接口吗
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best Regards

Jeff Zhang


Re:Re:flink sql是否有json_to_map函数,或者实现方法是怎么样的?

2021-01-21 Thread Jeff
但udtf需要指定结果返回个数及字段名,如:@FunctionHint(output = @DataTypeHint("ROW"),但我希望把这个udtf弄得通用一点,因为json结构是不确定的,不受字段名跟字段个数限制。















在 2021-01-21 18:02:06,"Michael Ran"  写道:
>特定的的map也是需要类型的,如果你在乎类型建议里面统一以字符串的udtf实现,后续再进行转换
>在 2021-01-21 18:35:18,"Jeff"  写道:
>>hi all,
>>
>>
>>有没有什么办法可以将json转成map呢?类似于str_to_map函数。
>>
>>
>>版本:flink 1.11 
>>planner: blink sql
>>
>>
>>需求背景: UDF函数通过http请求获得了json字符串,希望能够直接使用json内字段, 
>>UDTF可以满足一部分要求,但不灵活,因为定义UDTF时需要指定输出字段及类型,限制很大。


Re:Re:flink sql是否有json_to_map函数,或者实现方法是怎么样的?

2021-01-21 Thread Jeff
但udtf需要指定结果返回个数及字段名,如:@FunctionHint(output = @DataTypeHint("ROW"),但我希望把这个udtf弄得通用一点,因为json结构是不确定的,不受字段名跟字段个数限制。

















在 2021-01-21 18:02:06,"Michael Ran"  写道:
>特定的的map也是需要类型的,如果你在乎类型建议里面统一以字符串的udtf实现,后续再进行转换
>在 2021-01-21 18:35:18,"Jeff"  写道:
>>hi all,
>>
>>
>>有没有什么办法可以将json转成map呢?类似于str_to_map函数。
>>
>>
>>版本:flink 1.11 
>>planner: blink sql
>>
>>
>>需求背景: UDF函数通过http请求获得了json字符串,希望能够直接使用json内字段, 
>>UDTF可以满足一部分要求,但不灵活,因为定义UDTF时需要指定输出字段及类型,限制很大。


flink sql是否有json_to_map函数,或者实现方法是怎么样的?

2021-01-21 Thread Jeff
hi all,


有没有什么办法可以将json转成map呢?类似于str_to_map函数。


版本:flink 1.11 
planner: blink sql


需求背景: UDF函数通过http请求获得了json字符串,希望能够直接使用json内字段, 
UDTF可以满足一部分要求,但不灵活,因为定义UDTF时需要指定输出字段及类型,限制很大。

flink sql是否有json_to_map函数,或者实现方法是怎么样的?

2021-01-21 Thread Jeff
hi all,


有没有什么办法可以将json转成map呢?类似于str_to_map函数。


版本:flink 1.11 
planner: blink sql


需求背景: UDF函数通过http请求获得了json字符串,希望能够直接使用json内字段, 
UDTF可以满足一部分要求,但不灵活,因为定义UDTF时需要指定输出字段及类型,限制很大。





Re: 获取flinksql返回的查询结果

2021-01-18 Thread Jeff Zhang
Sink 到内存里,然后你自己处理(print出来还是发送到web前端)
可以参考zeppelin源码 https://github.com/apache/zeppelin/tree/master/flink


黑色  于2021年1月18日周一 下午8:42写道:

> 请教个问题,我想实现类似sql-cli里功能一样,或者zepplin里的我在页面上编写select * from ,
>
>
> 在页面下面得到返回的结果显示,类似zepplin在开发区写sql,下面的输出区显示返回的结果,这个功能如何实现呢
> 各位大哥们帮忙看一看?
>
>
> 现在在flinksql上想看结果,还的定义一个with='print',跑到ui页面上去看,太麻烦了



-- 
Best Regards

Jeff Zhang


[Announce] Zeppelin 0.9.0 is released (Flink on Zeppelin)

2021-01-17 Thread Jeff Zhang
Hi flink folks,

I'd like to tell you that Zeppelin 0.9.0 is officially released, in this
new version we made a big refactoring and improvement on flink support. It
supports 3 major versions of flink (1.10, 1.11, 1.12)

You can download zeppelin 0.9.0 here.
http://zeppelin.apache.org/download.html

And check the following 2 links for more details of how to use flink on
zeppelin
https://app.gitbook.com/@jeffzhang/s/flink-on-zeppelin/
http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html


-- 
Best Regards

Jeff Zhang


[Announce] Zeppelin 0.9.0 is released (Flink on Zeppelin)

2021-01-17 Thread Jeff Zhang
Hi flink folks,

I'd like to tell you that Zeppelin 0.9.0 is officially released, in this
new version we made a big refactoring and improvement on flink support. It
supports 3 major versions of flink (1.10, 1.11, 1.12)

You can download zeppelin 0.9.0 here.
http://zeppelin.apache.org/download.html

And check the following 2 links for more details of how to use flink on
zeppelin
https://app.gitbook.com/@jeffzhang/s/flink-on-zeppelin/
http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html


-- 
Best Regards

Jeff Zhang


Re: 构建Flink任务提交平台时兼容低版本Flink的问题

2021-01-15 Thread Jeff Zhang
可以参考 zeppelin的方法,zeppelin支持多个版本的flink (1.10, 1.11, 1.12)

https://www.yuque.com/jeffzhangjianfeng/gldg8w/bam5y1
https://github.com/apache/zeppelin/tree/master/flink


Shengnan YU  于2021年1月16日周六 上午8:15写道:

> 请问各位大佬,目前我用Flink
> 1.11的内部API实现了Flink任务提交的平台,但是由于1.11相对1.10大改了客户端的实现方案,导致低版本的Flink应用难以适配,请问大家有没有在构建平台时有兼容多版本Flink任务的好方案,并且能够方便快速升级迭代支持新的Flink版本,我现在想到的就是用命令行去做。
>


-- 
Best Regards

Jeff Zhang


Re: zeppelin+flink1.12问题

2020-12-15 Thread Jeff Zhang
钉钉群里的新版本已经解决了,钉钉群号:32803524

赵一旦  于2020年12月15日周二 下午4:37写道:

> 如题,zeppelin+flink1.12报错(select 1)。
> org.apache.zeppelin.interpreter.InterpreterException:
> org.apache.zeppelin.interpreter.InterpreterException:
> java.lang.NoSuchMethodError:
>
> org.apache.flink.api.common.ExecutionConfig.disableSysoutLogging()Lorg/apache/flink/api/common/ExecutionConfig;
> at
>
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
> at
>
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:760)
> at
>
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
> at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
> at
>
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
> at
>
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.zeppelin.interpreter.InterpreterException:
> java.lang.NoSuchMethodError:
>
> org.apache.flink.api.common.ExecutionConfig.disableSysoutLogging()Lorg/apache/flink/api/common/ExecutionConfig;
> at
>
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
> at
>
> org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:355)
> at
>
> org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:366)
> at
>
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.open(FlinkStreamSqlInterpreter.java:47)
> at
>
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
> ... 8 more
> Caused by: java.lang.NoSuchMethodError:
>
> org.apache.flink.api.common.ExecutionConfig.disableSysoutLogging()Lorg/apache/flink/api/common/ExecutionConfig;
> at
>
> org.apache.zeppelin.flink.FlinkScalaInterpreter.setTableEnvConfig(FlinkScalaInterpreter.scala:444)
> at
>
> org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:114)
> at
> org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:67)
> at
>
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
> ... 12 more
>


-- 
Best Regards

Jeff Zhang


Re:flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000

2020-12-09 Thread Jeff



这个问题我也遇到了,请问后来怎么解决的呢? 更换成flink1.11.2都不行!














在 2020-03-24 07:14:07,"Peihui He"  写道:
>大家好,我在用flink
>1.9.2 部署到容器的时候如果不启动ha的情况下jobid是正常的,但是启动了就变成了
>这样的话,checkpoint的地址和ha的文件地址都一样了,导致checkpoint总是失败。
>不知道这是什么原因呢?


statementset下source怎么完全复用

2020-11-23 Thread Jeff
请问一下,flink 1.11statement set 怎么复用同一个source呢? 
希望同一个job里不同sink使用完全相同的数据,不是默认的用hash分流,这个有地方设置么?

Re: flink zeppelin的type参数(append/update/single)和flink的动态表有关系嘛

2020-11-22 Thread Jeff Zhang
和 flink 没关系,是zeppelin自己定义的参数,只影响select 语句,对于zeppelin的数据可视化有影响,不影响flink job

赵一旦  于2020年11月23日周一 下午1:11写道:

> 如题,这个参数是仅仅zeppelin自身的参数,用于决定如何展示数据之类的逻辑呢?
> 还是和flink任务也有关系。按照
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/te2l1c#AzSOu
> 说法,append模式第一个结果列必须是时间,所以看起来更像是zeppelin自身的要求。
>
> 我看了下append方式执行,jdbc仍然使用的upsertSink。
>
> 所以谁确认下这个参数是不是和具体任务没啥关系。
>


-- 
Best Regards

Jeff Zhang


jobmanager与taskmanager之间用rpc通信,为什么taskmanager之间用netty通信?

2020-11-15 Thread Jeff
如题!  jobmanager与taskmanager之前通信也用netty通信不行吗?

Re: Flink作业运行失败

2020-10-15 Thread Jeff Zhang
你是hadoop2 吗?我记得这个情况只有hadoop3才会出现


gangzi <1139872...@qq.com> 于2020年10月16日周五 上午11:22写道:

> TM
> 的CLASSPATH确实没有hadoop-mapreduce-client-core.jar。这个难道是hadoop集群的问题吗?还是一定要shade-hadoop包,官方不推荐shade-hadoop包了。
>
> > 2020年10月16日 上午10:50,Jeff Zhang  写道:
> >
> > 你看看TM的log,里面有CLASSPATH的
> >
> > gangzi <1139872...@qq.com> 于2020年10月16日周五 上午10:11写道:
> >
> >> 我按照flink官方文档的做法,在hadoop集群每个节点上都:export HADOOP_CLASSPATH =`hadoop
> >> classpath`,但是报:java.lang.NoClassDefFoundError:
> >> org/apache/hadoop/mapred/JobConf
> >>
> >>
> 不知道这个是不是flink的bug,按照这个报错,是缺少:hadoop-mapreduce-client-core.jar这个jar包,但是这个包是在/usr/local/hadoop-2.10.0/share/hadoop/mapreduce/*:这个目录下的,这个目录是包含在HADOOP_CLASSPATH上的,按理说是会加载到的。
> >>
> >>> 2020年10月16日 上午9:59,Shubin Ruan  写道:
> >>>
> >>> export HADOOP_CLASSPATH=
> >>
> >>
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
>
>

-- 
Best Regards

Jeff Zhang


Re: Flink作业运行失败

2020-10-15 Thread Jeff Zhang
你看看TM的log,里面有CLASSPATH的

gangzi <1139872...@qq.com> 于2020年10月16日周五 上午10:11写道:

> 我按照flink官方文档的做法,在hadoop集群每个节点上都:export HADOOP_CLASSPATH =`hadoop
> classpath`,但是报:java.lang.NoClassDefFoundError:
> org/apache/hadoop/mapred/JobConf
>
> 不知道这个是不是flink的bug,按照这个报错,是缺少:hadoop-mapreduce-client-core.jar这个jar包,但是这个包是在/usr/local/hadoop-2.10.0/share/hadoop/mapreduce/*:这个目录下的,这个目录是包含在HADOOP_CLASSPATH上的,按理说是会加载到的。
>
> > 2020年10月16日 上午9:59,Shubin Ruan  写道:
> >
> > export HADOOP_CLASSPATH=****
>
>

-- 
Best Regards

Jeff Zhang


Re: Safer handling of Scala immutable collections

2020-10-14 Thread Jeff Zhang
Could you share your code to reproduce it ?

Rex Fenley  于2020年10月15日周四 上午5:54写道:

> Hello,
>
> I've been playing with UDFs using the Scala API and have repeatedly run
> into issues such as this:
> ```
> flink-taskmanager_1| java.lang.ClassCastException:
> scala.collection.immutable.Set$EmptySet$ cannot be cast to [J
> ```
> Is there something that can be done on Flink's end, either to catch these
> errors in type checking or to cast them in a sane manner during runtime?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 
Best Regards

Jeff Zhang


Re: 动态加载table和udf的方法

2020-10-09 Thread Jeff Zhang
zeppelin 支持直接写udf,参考 https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2
或者加入钉钉群讨论:32803524

Zeahoo Z  于2020年10月10日周六 上午9:04写道:

> 你好,在开发中遇到了下面这个困难。
>
> 目前将定义的table和udf函数写在了 conf/sql-client-defaults.yaml
>
> 文件中。程序运行没有问题,但是带来一个问题:如果我需要添加或者修改table或者udf文件的时候,需要重启flink程序。有没有办法可以让我动态地添加或者更新我的table和udf(table新增可以通过sql-client来添加,但是更倾向于将table的定义记录在文件,udf存在修改和新增的情况)。这样依赖可以保证flink不重启。
>


-- 
Best Regards

Jeff Zhang


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-25 Thread Jeff Zhang
如果不是flink jar的job,可以用zeppelin sdk来提交flink job
https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh


xiao cai  于2020年9月25日周五 下午4:23写道:

> Hi all:
> 大家好,我目前遇到一个flink 任务提交方面的困扰:
> 想要在自己的项目中(springboot)提交flink
> job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
> 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。
> 非常感谢
>
>
> best,
> xiao



-- 
Best Regards

Jeff Zhang


Re: zeppelin指定的依赖去哪找

2020-09-24 Thread Jeff Zhang
没有搜索路径,需要用绝对路径

赵一旦  于2020年9月24日周四 下午3:22写道:

> 看了你文章,有jars。想继续问下,jars必须完全路径嘛,有没有什么默认的搜索路径,我简单写jar名字的。不想写死路径。
>
> 赵一旦  于2020年9月24日周四 下午3:17写道:
>
> > 这就有点麻烦了,公司机器一般不允许连接外部网络的。
> >
> > Jeff Zhang  于2020年9月24日周四 下午3:15写道:
> >
> >> flink.execution.packages 下的包会从maven仓库下载,看这里关于如何在zeppelin里管理第三方依赖的方法
> >> https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
> >>
> >>
> >> 赵一旦  于2020年9月24日周四 下午3:09写道:
> >>
> >> > 通过zeppelin写sql,之前sql-client可行的sql总是报错。
> >> >
> >> > zeppelin配置的FLINK_HOME的目录中lib下是有相关包的,但远程remote集群没有。
> >> > 之前sql-client方式是基于-l方式指定的。
> >> >
> >> >
> >>
> zeppelin情况下,貌似有个flink.execution.packages,但是并没说明这个指定的包去哪找的?是zeppelin配置的FLINK_HOME中的lib嘛?我lib中有包,但还是报错。
> >> >
> >>
> >>
> >> --
> >> Best Regards
> >>
> >> Jeff Zhang
> >>
> >
>


-- 
Best Regards

Jeff Zhang


Re: zeppelin指定的依赖去哪找

2020-09-24 Thread Jeff Zhang
两个办法:
1. 用私有的maven 仓库
2. 自己打jar包,用 flink.exection.jars 来指定
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s#3BNYl

赵一旦  于2020年9月24日周四 下午3:17写道:

> 这就有点麻烦了,公司机器一般不允许连接外部网络的。
>
> Jeff Zhang  于2020年9月24日周四 下午3:15写道:
>
> > flink.execution.packages 下的包会从maven仓库下载,看这里关于如何在zeppelin里管理第三方依赖的方法
> > https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
> >
> >
> > 赵一旦  于2020年9月24日周四 下午3:09写道:
> >
> > > 通过zeppelin写sql,之前sql-client可行的sql总是报错。
> > >
> > > zeppelin配置的FLINK_HOME的目录中lib下是有相关包的,但远程remote集群没有。
> > > 之前sql-client方式是基于-l方式指定的。
> > >
> > >
> >
> zeppelin情况下,貌似有个flink.execution.packages,但是并没说明这个指定的包去哪找的?是zeppelin配置的FLINK_HOME中的lib嘛?我lib中有包,但还是报错。
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>


-- 
Best Regards

Jeff Zhang


Re: zeppelin指定的依赖去哪找

2020-09-24 Thread Jeff Zhang
flink.execution.packages 下的包会从maven仓库下载,看这里关于如何在zeppelin里管理第三方依赖的方法
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s


赵一旦  于2020年9月24日周四 下午3:09写道:

> 通过zeppelin写sql,之前sql-client可行的sql总是报错。
>
> zeppelin配置的FLINK_HOME的目录中lib下是有相关包的,但远程remote集群没有。
> 之前sql-client方式是基于-l方式指定的。
>
> zeppelin情况下,貌似有个flink.execution.packages,但是并没说明这个指定的包去哪找的?是zeppelin配置的FLINK_HOME中的lib嘛?我lib中有包,但还是报错。
>


-- 
Best Regards

Jeff Zhang


线上环境出现:org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Serializer,本地没有

2020-09-21 Thread Jeff
在本地IDEA里测试处理相同TOPIC正常,但在线上环境出现了这样的异常:
org.apache.kafka.common.serialization.StringSerializer is not an instance of 
org.apache.kafka.common.serialization.Serializer。将StringSerializer 换成 
ByteArraySerializer也是类似错误,不知道该如何解决该问题了。请问还有其它思路来解决这个问题吗? 
业务逻辑非常简单:从SOURCE表内过滤数据到sink表。
flink版本:1.11.1 kafka版本:2.1.0


SQL内KAFKA配置如下:
source:
create table ***
with (
'connector' = 'kafka',
'topic'='**',
'scan.startup.mode'='latest-offset',
'format'='json',
'properties.group.id' = '***',
'properties.bootstrap.servers'='***:9092',
'properties.enable.auto.commit'='true',
'properties.auto.commit.interval.ms'='1000',
'properties.key.deserializer'='org.apache.kafka.common.serialization.StringDeserializer',
'properties.value.deserializer'='org.apache.kafka.common.serialization.StringDeserializer',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule
 required username=\"\" password=\"\";');


sink:
create table ***
with (
'connector' = 'kafka',
'topic'='***',
'scan.startup.mode'='latest-offset',
'format'='json',
'properties.bootstrap.servers'='*:9092',
'properties.max.poll.records'='50',
'properties.enable.auto.commit'='true',
'properties.auto.commit.interval.ms'='1000',
'properties.key.serializer'='org.apache.kafka.common.serialization.StringSerializer',
'properties.value.serializer'='org.apache.kafka.common.serialization.StringSerializer',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule
 required username=\"*\" password=\"**\";');







Re: 多线程模式下使用Blink TableEnvironment

2020-09-18 Thread Jeff Zhang
Hi jun su,

如果是自建平台的话,可以考虑用zeppelin的sdk 来提交作业
https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh





jun su  于2020年9月18日周五 上午10:59写道:

> hi godfrey,
>
> 我们的用法类似zeppelin, 项目形式类似notebook, 在第一次运行笔记时创建env,
> 再次运行notebook时会创建新线程来构建job运行, 所以我参考了zepplin的做法暂时fix了这个问题
>
> godfrey he  于2020年9月17日周四 下午10:07写道:
>
> > TableEnvironment 不是多线程安全的。
> >
> > btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗?
> >
> > Jeff Zhang  于2020年9月14日周一 下午12:10写道:
> >
> > > 参考zeppelin的做法,每个线程里都调用这个
> > >
> > >
> > >
> >
> https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111
> > >
> > >
> > > jun su  于2020年9月14日周一 上午11:54写道:
> > >
> > > > hi all,
> > > >
> > > > 多线程模式下执行sql , 在非聚合sql时报了如下错误:
> > > >
> > > > Caused by: java.lang.NullPointerException
> > > >   at java.util.Objects.requireNonNull(Objects.java:203)
> > > >   at
> > > >
> > > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> > > >   at
> > > >
> > > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> > > >
> > > >
> > > >
> > > >
> > >
> >
> 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))
> > > > 解决
> > > >
> > > >
> > > > 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix?
> > > >
> > > > Caused by: java.lang.NullPointerException
> > > > at scala.Predef$.Double2double(Predef.scala:365)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
> > > > at
> > > >
> > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> > > > Source)
> > > > at
> > > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> > > > Source)
> > > >
> > > > --
> > > > Best,
> > > > Jun Su
> > > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> > >
> >
>
>
> --
> Best,
> Jun Su
>


-- 
Best Regards

Jeff Zhang


Re: 多线程模式下使用Blink TableEnvironment

2020-09-13 Thread Jeff Zhang
参考zeppelin的做法,每个线程里都调用这个

https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111


jun su  于2020年9月14日周一 上午11:54写道:

> hi all,
>
> 多线程模式下执行sql , 在非聚合sql时报了如下错误:
>
> Caused by: java.lang.NullPointerException
>   at java.util.Objects.requireNonNull(Objects.java:203)
>   at
>
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
>   at
>
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
>
>
>
> 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))
> 解决
>
>
> 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix?
>
> Caused by: java.lang.NullPointerException
> at scala.Predef$.Double2double(Predef.scala:365)
> at
>
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81)
> at
>
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
> at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> Source)
> at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> Source)
>
> --
> Best,
> Jun Su
>


-- 
Best Regards

Jeff Zhang


Re: 有木有比较好flink sql 任务提交插件推荐

2020-09-13 Thread Jeff Zhang
zeppelin 提交sql 也很方便,可以通过UI,也可以通过zeppelin sdk,
https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh

钉钉群号:32803524
二维码:
[image: flink_on_zeppelin_2.png]

xuzh  于2020年9月14日周一 上午10:01写道:

> Dear all:
> 目前有找到两个sql任务提交插件:
> https://github.com/wuchong/flink-sql-submit
> https://github.com/springMoon/sqlSubmit
> 大家有木有用过,推荐一下



-- 
Best Regards

Jeff Zhang


Re: flink集成spring

2020-09-09 Thread Jeff Zhang
可以看看这个zeppelin sdk,,也许适合你
https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh


1115098...@qq.com  于2020年9月10日周四 上午9:09写道:

> 大家好,我在将spring boot集成到flink的过程中,遇到很多问题,感觉不太兼容。看官方文档也没有集成spring
> boot的介绍,是不是flink设计的时候就没有考虑与spring boot的集成?



-- 
Best Regards

Jeff Zhang


Re: flink sql client 如何同时执行多条 sql 语句

2020-09-05 Thread Jeff Zhang
可以尝试下zeppelin,zeppelin是支持多条sql的。可以看这里的文档和加入钉钉群

https://www.yuque.com/jeffzhangjianfeng/gldg8w/te2l1c#WMEMY

钉钉群号:32803524
钉钉群二维码:
[image: flink_on_zeppelin_2.png]

faaron zheng  于2020年9月5日周六 上午9:37写道:

> Hi, sql-client目前应该是没有这个能力的,它是交互式执行的,我们之前在sql-client的基础上改过一个类似beeline
> -e/-f的脚本,主要修改的提交任务的地方。 Best, Faaron Zheng 在2020年09月04日 17:04,LittleFall 写道:
> 我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client
> 提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql
> 也会报错。 请问用什么样的方法可以一次性执行多条语句呢? -- Sent from:
> http://apache-flink.147419.n8.nabble.com/



-- 
Best Regards

Jeff Zhang


Re: 基于flink1.10源码编译问题

2020-08-21 Thread Jeff Zhang
不要用aliyun maven repo,另外你这是1.10-SNAPSHOT 不是1.10的release版本

魏烽  于2020年8月21日周五 下午8:44写道:

> 各位好:
>
>
> 

Re: flink任务提交

2020-08-19 Thread Jeff Zhang
Zeppelin 最近在做这样的API来提交Flink Job,这里有个例子可以参考下
https://github.com/zjffdu/zeppelin/blob/ZEPPELIN-4981/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java#L307

可以加入钉钉群讨论,钉钉群号:32803524



Dream-底限  于2020年8月19日周三 下午4:27写道:

> hi、
> 请问你说的是自己拼接cli字符串,然后通过java调用子进程执行的这种方式吗
>
> 我先前也是这么做的,但感觉比较怪异,这种方式有一个问题是貌似没办法直接返回applicationId,要从日志里面筛选好像,再就是没办法判断提交是否成功,貌似也是从日志来做,请问这个applicationId在提交的时候除了从日志筛选有其他的获取方式吗
>
> wxpcc  于2020年8月19日周三 下午4:09写道:
>
> > 大概可以用,YarnClusterDescriptor
> >
> > 命令行方式封装提交对于后续升级更加方便一些,个人建议
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best Regards

Jeff Zhang


Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-13 Thread Jeff Zhang
你的10台机器是flink standalone还是 yarn集群 ?
如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。

另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
或者加入钉钉群讨论,钉钉群号: 32803524


Zhao,Yi(SEC)  于2020年8月13日周四 下午1:02写道:

> 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
> 现在比较混乱,哪些jar需要放到A,哪些放到B。
>
>
> (1) kafka ssl
> 证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
>
> (2)
>  
> flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
>
>
>
> 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
>
> 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
>
>
>

-- 
Best Regards

Jeff Zhang


Re: 数据预览

2020-08-01 Thread Jeff Zhang
Apache Zeppelin有自己的rest api,你可以用rest api来提交flink sql
以及拿sql结果,目前Zeppelin社区正在做一个Client API (Zeppelin SDK),
用户可以更加方便的调用Zeppelin的功能。具体可以参考
https://issues.apache.org/jira/browse/ZEPPELIN-4981

这里有Sample code 可以参考
https://github.com/zjffdu/zeppelin/blob/ZEPPELIN-4981/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java#L298

对于Flink on Zeppelin感兴趣的,可以加入钉钉群:32803524



forideal  于2020年8月1日周六 下午7:49写道:

> 你好,我的朋友
>
>
>最近我看 Flink doc 中的文档中有了如下 connector
>   DataGen
>   Print
>   BlackHole
>这大大的方便了开发和调试。不过,我还是不太满足,想了解一下数据预览相关的做法。
>比如我想,如果我有一个 Flink 的 `driver` ,然后,我使用这个 driver 提交一条 SQL,我从 ResultSet
> 中获取数据。这样又可以大大的方面我们的 Flink SQL 开发者。
>在社区中,我已经体验了 Apache Zeppelin ,他可以让我提交 Flink SQL,然后在页面上面等待刷新的结果,但是
> Zeppelin 目前不能很好的集成到我们的 Flink web IDE 中。想了解一下如何实现数据预览。
>
>
>Best forideal



-- 
Best Regards

Jeff Zhang


Re: 关于 sql-client

2020-07-24 Thread Jeff Zhang
可以用zeppelin来提交flink sql作业,可以加入钉钉群讨论:32803524

杨荣  于2020年7月24日周五 下午3:19写道:

> 你们可以 pr 到官方啊。我觉得这个功能很刚需啊,并且 basic 版的 1.5 就 release 了,不知道为什么相关 gate way 或者
> submit with sql file 的 feature 到现在都还没实现呢。
>
> Harold.Miao  于2020年7月24日周五 上午11:42写道:
>
> > 1 应该是可以的   主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address
> > 源码里面有加载主配置文件的逻辑
> >
> > public LocalExecutor(URL defaultEnv, List jars, List
> libraries) {
> >// discover configuration
> >final String flinkConfigDir;
> >try {
> >   // find the configuration directory
> >   flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
> >
> >   // load the global configuration
> >   this.flinkConfig =
> > GlobalConfiguration.loadConfiguration(flinkConfigDir);
> >
> >   // initialize default file system
> >   FileSystem.initialize(flinkConfig,
> > PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
> >
> >   // load command lines for deployment
> >   this.commandLines =
> > CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
> >   this.commandLineOptions = collectCommandLineOptions(commandLines);
> >} catch (Exception e) {
> >   throw new SqlClientException("Could not load Flink configuration.",
> > e);
> >}
> >
> >
> > 2  因为等不及官方的  我们自己wrapper实现了一个
> >
> >
> >
> >
> > 杨荣  于2020年7月24日周五 上午10:53写道:
> >
> > > Hi all,
> > >
> > > 请问:
> > > 1. 在 Embedded mode 下,支持 ClusterClient 进行 job
> > > 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。
> > >
> > > 2. GateWay mode 预计在那个版本 release?
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 
Best Regards

Jeff Zhang


Re: FlinkSQL 任务提交后 任务名称问题

2020-07-18 Thread Jeff Zhang
在zeppelin中你可以指定insert 语句的job name,如下图,(对Zeppelin感兴趣的,可以加入钉钉群:32803524)

%flink.ssql(jobName="my job")

insert into sink_kafka select status, direction, cast(event_ts/10
as timestamp(3)) from source_kafka where status <> 'foo'

[image: image.png]

Evan  于2020年7月18日周六 下午5:47写道:

> 代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into
> esSinkTable select ... from kafkaSourceTable")执行
> 任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table”
>
>
> 这样很不友好啊,能不能我自己指定任务名称呢?



-- 
Best Regards

Jeff Zhang


Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-11 Thread Jeff Zhang
Zeppelin 能够帮你提交和cancel job,就是通过上面jianxu说的ClusterClient
api来做到的,对zeppelin感兴趣的话,可以参考这个视频

https://www.bilibili.com/video/BV1Te411W73b?p=21


jianxu  于2020年7月11日周六 下午4:52写道:

> Hi:
>
> 我想,你可能打算通过API的方式来取消正在运行的流任务。Flink任务提交时需要构建ClusterClient,提交成功后会返回任务对应的JobId。任务取消时,通过调用ClusterClient的cancel(JobID
> jobId)取消流任务。
> Flink源码可以看看 CliFrontend[1]中的逻辑,如果觉得比较麻烦可以参考
> https://github.com/todd5167/flink-spark-submiter
> 项目的任务提交部分,取消任务时构建ClusterClient即可。
>
>
>
>
>
>
>
>
>
>
> | |
> jianxu
> |
> |
> rjia...@163.com
> |
>
>
>
>
> 在2020年07月11日 16:19,Congxian Qiu 写道:
> Hi
>
> 如果你是想做一个作业管理的平台,可以尝试看一下 CliFrontend[1] 中相关的逻辑,对于 On Yarn
> 的作业,简单地说你需要能够正确的初始化一个 client 和 Yarn RM 交互,然后你需要知道 applicationId,另外你还需要知道
> flink 的 JobId,接下来就是调用 Flink 的接口了
>
> 如果像更多的了解参数如从和命令行传到 java 代码的,你可以自己写一个单元测试,单步调试一下整个流程。
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
>
> Best,
> Congxian
>
>
> godfrey he  于2020年7月9日周四 上午10:08写道:
>
> 可以通过 StreamExecutionEnvironment#executeAsync 提交作业,返回 JobClient [1], 通过
> JobClient 可以 cancel 作业,获取 job status。
>
> [1]
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
>
> Best,
> Godfrey
>
> Evan  于2020年7月9日周四 上午9:40写道:
>
> 这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming
> API有没有提供类似的接口,调用后就能停止这个Stream作业呢?
>
>

-- 
Best Regards

Jeff Zhang


Re: Trouble with large state

2020-06-22 Thread Jeff Henrikson

Bhaskar,

I think I am unstuck.  The performance numbers I sent after throttling 
were due to a one character error in business logic.  I think I now have 
something good enough to work with for now.  I will repost if I 
encounter further unexpected issues.


Adding application-level throttling ends up resolving both my symptom of 
slow/failing checkpoints, and also my symptom of crashes during long runs.


Many thanks!


Jeff


On 6/20/20 11:46 AM, Jeff Henrikson wrote:

Bhaskar,

 > Glad to know some progress.

Yeah, some progress.  Yet overnight run didn't look as good as I hoped.

The throttling required to not crash during snapshots seems to be quite 
different from the throttling required to crash not during snapshots. So 
the lowest common denominator is quite a large performance penalty.


What's worse, the rate of input that makes the snapshot performance go 
from good to bad seems to change significantly as the state size grows. 
Here is checkpoint history from an overnight run.


Parameters:

     - 30 minutes minimum between snapshots
     - incremental snapshot mode
     - inputs throttled to 100 events per sec per input per slot,
   which is around 1/4 of the unthrottled throughput

Checkpoint history:

 ID    Status    Acknowledged    Trigger Time    Latest 
Acknowledgement    End to End Duration    State Size    Buffered During 
Alignment
 12    COMPLETED    304/304    8:52:22    10:37:18    1h 44m 55s
60.5 GB    0 B
 11    COMPLETED    304/304    6:47:03    8:22:19    1h 35m 16s
53.3 GB    0 B
 10    COMPLETED    304/304    5:01:20    6:17:00    1h 15m 39s
41.0 GB    0 B
 9    COMPLETED    304/304    3:47:43    4:31:19    43m 35s    34.1 
GB    0 B
 8    COMPLETED    304/304    2:40:58    3:17:42    36m 43s    27.8 
GB    0 B
 7    COMPLETED    304/304    1:39:15    2:10:57    31m 42s    23.1 
GB    0 B
 6    COMPLETED    304/304    0:58:02    1:09:13    11m 11s    17.4 
GB    0 B
 5    COMPLETED    304/304    0:23:27    0:28:01    4m 33s    14.3 
GB    0 B
 4    COMPLETED    304/304    23:52:29    23:53:26    56s    12.7 
GB    0 B
 3    COMPLETED    304/304    23:20:59    23:22:28    1m 29s    10.8 
GB    0 B
 2    COMPLETED    304/304    22:46:17    22:50:58    4m 40s    7.40 
GB    0 B


As you can see, GB/minute varies drastically.  GB/minute also varies 
drastically with full checkpoint mode.


I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the 
checkpoint GB/minute getting so slow, it will crash soon.


I'm really wishing state.backend.async=false worked for 
RocksDbStateBackend.


I'm also wondering if my throttler would improve if I just connected to 
the REST api to ask if any checkpoint is in progress, and then paused 
inputs accordingly.  Effectively state.backend.async=false via hacked 
application code.


 > Where are you updating your state here? I
 > couldn't find any flink managed state here.

The only updates to state I make are through the built-in 
DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I 
use .cogroup shows exactly two ways that .cogroup calls an 
implementation of AppendingState.add.  I summarize those below.


The two AppendingState subclasses invoked are HeapListState and 
HeapReducingState.  Neither have a support attribute on them, such as 
MapState's @PublicEvolving.


 > I suggested updating the flink managed state using onTimer over an
 > interval equal to the checkpoint interval.

So the onTimer method, with interval set to the checkpoint interval. 
Interesting.


It looks like the closest subclass for my use case use would be either 
KeyedCoProcessFunction.  Let me see if I understand concretely the idea:


1) between checkpoints, read join input and write join output, by 
loading any state reads from external state, but buffering all state 
changes in memory in some kind of data structure.


2) whenever a checkpoint arrived or the memory consumed by buffered 
writes gets too big, flush the writes to state.


Is that the gist of the idea about .onTimer?


Jeff



There are two paths from .coGroup to AppendingState.add

     path 1 of 2: .coGroup to HeapListState

     add:90, HeapListState {org.apache.flink.runtime.state.heap}
     processElement:203, EvictingWindowOperator 
{org.apache.flink.streaming.runtime.operators.windowing}
     processElement:164, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}
     processInput:143, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}



org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement 



   (windowAssigner is an instance of GlobalWindows)

     @Override
     public void processElement(StreamRecord element) 
throws Exception {
     final Collection elementWindows = 
windowAssigner.assignWindows(
     e

Re: Trouble with large state

2020-06-20 Thread Jeff Henrikson

Bhaskar,

> Glad to know some progress.

Yeah, some progress.  Yet overnight run didn't look as good as I hoped.

The throttling required to not crash during snapshots seems to be quite 
different from the throttling required to crash not during snapshots. 
So the lowest common denominator is quite a large performance penalty.


What's worse, the rate of input that makes the snapshot performance go 
from good to bad seems to change significantly as the state size grows. 
Here is checkpoint history from an overnight run.


Parameters:

- 30 minutes minimum between snapshots
- incremental snapshot mode
- inputs throttled to 100 events per sec per input per slot,
  which is around 1/4 of the unthrottled throughput

Checkpoint history:

	ID	Status	Acknowledged	Trigger Time	Latest Acknowledgement	End to End 
Duration	State Size	Buffered During Alignment

12  COMPLETED   304/304 8:52:22 10:37:181h 44m 55s  
60.5 GB 0 B
11  COMPLETED   304/304 6:47:03 8:22:19 1h 35m 16s  53.3 GB 
0 B
10  COMPLETED   304/304 5:01:20 6:17:00 1h 15m 39s  41.0 GB 
0 B
9   COMPLETED   304/304 3:47:43 4:31:19 43m 35s 34.1 GB 0 B
8   COMPLETED   304/304 2:40:58 3:17:42 36m 43s 27.8 GB 0 B
7   COMPLETED   304/304 1:39:15 2:10:57 31m 42s 23.1 GB 0 B
6   COMPLETED   304/304 0:58:02 1:09:13 11m 11s 17.4 GB 0 B
5   COMPLETED   304/304 0:23:27 0:28:01 4m 33s  14.3 GB 0 B
4   COMPLETED   304/304 23:52:2923:53:2656s 
12.7 GB 0 B
3   COMPLETED   304/304 23:20:5923:22:281m 29s  
10.8 GB 0 B
2   COMPLETED   304/304 22:46:1722:50:584m 40s  
7.40 GB 0 B

As you can see, GB/minute varies drastically.  GB/minute also varies 
drastically with full checkpoint mode.


I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the 
checkpoint GB/minute getting so slow, it will crash soon.


I'm really wishing state.backend.async=false worked for RocksDbStateBackend.

I'm also wondering if my throttler would improve if I just connected to 
the REST api to ask if any checkpoint is in progress, and then paused 
inputs accordingly.  Effectively state.backend.async=false via hacked 
application code.


> Where are you updating your state here? I
> couldn't find any flink managed state here.

The only updates to state I make are through the built-in 
DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I 
use .cogroup shows exactly two ways that .cogroup calls an 
implementation of AppendingState.add.  I summarize those below.


The two AppendingState subclasses invoked are HeapListState and 
HeapReducingState.  Neither have a support attribute on them, such as 
MapState's @PublicEvolving.


> I suggested updating the flink managed state using onTimer over an
> interval equal to the checkpoint interval.

So the onTimer method, with interval set to the checkpoint interval. 
Interesting.


It looks like the closest subclass for my use case use would be either 
KeyedCoProcessFunction.  Let me see if I understand concretely the idea:


1) between checkpoints, read join input and write join output, by 
loading any state reads from external state, but buffering all state 
changes in memory in some kind of data structure.


2) whenever a checkpoint arrived or the memory consumed by buffered 
writes gets too big, flush the writes to state.


Is that the gist of the idea about .onTimer?


Jeff



There are two paths from .coGroup to AppendingState.add

path 1 of 2: .coGroup to HeapListState

add:90, HeapListState {org.apache.flink.runtime.state.heap}
processElement:203, EvictingWindowOperator 
{org.apache.flink.streaming.runtime.operators.windowing}
processElement:164, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}
processInput:143, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}



org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement

  (windowAssigner is an instance of GlobalWindows)

@Override
	public void processElement(StreamRecord element) 
throws Exception {
		final Collection elementWindows = 
windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), 
windowAssignerContext);


//if element is handled by none of assigned 
elementWindows
boolean isSkippedElement = true;

final K key = 
this.getKeyedStateBackend().getCurrentKey();

if (windowAssigner instanceof MergingWindowAssigner) {
. . .
} else {
for (W window : elementWindows) {

// check if the window is al

Re: Trouble with large state

2020-06-19 Thread Jeff Henrikson

Bhaskar,

Based on your idea of limiting input to get better checkpoint behavior, 
I made a ProcessFunction that constraints to a number of events per 
second per slot per input.  I do need to do some stateless input 
scanning before joins.  The stateless part needs to be fast and does no 
impact snapshots.  So I inserted the throttling after the input 
preprocessing but before the stateful transformations.  There is a 
significant difference of snapshot throughput (often 5x or larger) when 
I change the throttle between 200 and 300 events per second (per slot 
per input).


Hope the throttling keeps being effective as I keep the job running longer.

Odd.  But likely a very effective way out of my problem.

I wonder what drives it . . .  Thread contention?  IOPS contention?

See ProcessFunction code below.

Many thanks!


Jeff



import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

// Set eventsPerSecMax to -1 to disable the throttle
// TODO: Actual number of events can be slightly larger
// TODO: Remove pause correlation with system clock

case class Throttler[T](eventsPerSecMax : Double) extends 
ProcessFunction[T,T] {

  var minutePrev = 0
  var numEvents = 0
  def minutes() = {
val ms = System.currentTimeMillis()
(ms / 1000 / 60).toInt
  }
  def increment() = {
val m = minutes()
if(m != minutePrev) {
  numEvents = 0
}
numEvents += 1
  }
  def eps() = {
numEvents/60.0
  }
  override def processElement(x: T, ctx: ProcessFunction[T, T]#Context, 
out: Collector[T]): Unit = {

increment()
if(eventsPerSecMax > 0 && eps() > eventsPerSecMax) {
  Thread.sleep(1000L)
}
out.collect(x)
  }
}

On 6/19/20 9:16 AM, Jeff Henrikson wrote:

Bhaskar,

Thank you for your thoughtful points.

 > I want to discuss more on points (1) and (2)
 > If we take care of them  rest will be good
 >
 > Coming to (1)
 >
 > Please try to give reasonable checkpoint interval time for every job.
 > Minum checkpoint interval recommended by flink community is 3 minutes
 > I thin you should give minimum 3 minutes checkpoint interval for all

I have spent very little time testing with checkpoint intervals of under 
3 minutes.  I frequently test with intervals of 5 minutes and of 30 
minutes.  I also test with checkpoint intervals such as 60 minutes, and 
never (manual only).  In terms of which exceptions get thrown, I don't 
see much difference between 5/30/60, I don't see a lot of difference.


Infinity (no checkpoint internal) seems to be an interesting value, 
because before crashing, it seems to process around twice as much state 
as with any finite checkpoint interval.  The largest savepoints I have 
captured have been manually triggered using the /job/:jobid/stop REST 
API.  I think it helps for the snapshot to be synchronous.


One curiosity about the /job/:jobid/stop command is that from time of 
the command, it often takes many minutes for the internal processing to 
stop.


Another curiosity about /job/:jobid/stop command is that sometimes 
following a completed savepoint, the cluster goes back to running!


 > Coming to (2)
 >
 > What's your input data rate?

My application involves what I will call "main" events that are enriched 
by "secondary" events.  While the secondary events have several 
different input streams, data types, and join keys, I will estimate the 
secondary events all together.  My estimate for input rate is as follows:


     50M "main" events
     50 secondary events for each main event, for a
     total of around 2.5B input events
     8 nodes
     20 hours

Combining these figures, we can estimate:

     5000*50/8/20/3600 = 4340 events/second/node

I don't see how to act on your advice for (2).  Maybe your idea is that 
during backfill/bootstrap, I artificially throttle the inputs to my 
application?


100% of my application state is due to .cogroup, which manages a 
HeapListState on its own.  I cannot think of any controls for changing 
how .cogroup handles internal state per se.  I will paste below the 
Flink code path that .cogroup uses to update its internal state when it 
runs my application.


The only control I can think of with .cogroup that indirectly impacts 
internal state is delayed triggering.


Currently I use a trigger on every event, which I understand creates a 
suboptimal number of events.  I previously experimented with delayed 
triggering, but I did not get good results.


Just now I tried again ContinuousProcessingTimeTrigger of 30 seconds, 
with rocksdb.timer-service.factory: heap, and a 5 minute checkpoint 
interval.  The first checkpoint failed, which has been rare when I use 
all the same parameters except for triggering on every event.  So it 
looks worse not better.


Thanks again,


Jeff Henrikson




On 6/18/20 11:21 PM, Vijay Bhaskar wrote:

Thanks for the reply.
I want to discuss more o

Re: Trouble with large state

2020-06-19 Thread Jeff Henrikson

Bhaskar,

Thank you for your thoughtful points.

> I want to discuss more on points (1) and (2)
> If we take care of them  rest will be good
>
> Coming to (1)
>
> Please try to give reasonable checkpoint interval time for every job.
> Minum checkpoint interval recommended by flink community is 3 minutes
> I thin you should give minimum 3 minutes checkpoint interval for all

I have spent very little time testing with checkpoint intervals of under 
3 minutes.  I frequently test with intervals of 5 minutes and of 30 
minutes.  I also test with checkpoint intervals such as 60 minutes, and 
never (manual only).  In terms of which exceptions get thrown, I don't 
see much difference between 5/30/60, I don't see a lot of difference.


Infinity (no checkpoint internal) seems to be an interesting value, 
because before crashing, it seems to process around twice as much state 
as with any finite checkpoint interval.  The largest savepoints I have 
captured have been manually triggered using the /job/:jobid/stop REST 
API.  I think it helps for the snapshot to be synchronous.


One curiosity about the /job/:jobid/stop command is that from time of 
the command, it often takes many minutes for the internal processing to 
stop.


Another curiosity about /job/:jobid/stop command is that sometimes 
following a completed savepoint, the cluster goes back to running!


> Coming to (2)
>
> What's your input data rate?

My application involves what I will call "main" events that are enriched 
by "secondary" events.  While the secondary events have several 
different input streams, data types, and join keys, I will estimate the 
secondary events all together.  My estimate for input rate is as follows:


50M "main" events
50 secondary events for each main event, for a
total of around 2.5B input events
8 nodes
20 hours

Combining these figures, we can estimate:

5000*50/8/20/3600 = 4340 events/second/node

I don't see how to act on your advice for (2).  Maybe your idea is that 
during backfill/bootstrap, I artificially throttle the inputs to my 
application?


100% of my application state is due to .cogroup, which manages a 
HeapListState on its own.  I cannot think of any controls for changing 
how .cogroup handles internal state per se.  I will paste below the 
Flink code path that .cogroup uses to update its internal state when it 
runs my application.


The only control I can think of with .cogroup that indirectly impacts 
internal state is delayed triggering.


Currently I use a trigger on every event, which I understand creates a 
suboptimal number of events.  I previously experimented with delayed 
triggering, but I did not get good results.


Just now I tried again ContinuousProcessingTimeTrigger of 30 seconds, 
with rocksdb.timer-service.factory: heap, and a 5 minute checkpoint 
interval.  The first checkpoint failed, which has been rare when I use 
all the same parameters except for triggering on every event.  So it 
looks worse not better.


Thanks again,


Jeff Henrikson




On 6/18/20 11:21 PM, Vijay Bhaskar wrote:

Thanks for the reply.
I want to discuss more on points (1) and (2)
If we take care of them  rest will be good

Coming to (1)

Please try to give reasonable checkpoint interval time for every job. 
Minum checkpoint interval recommended by flink community is 3 minutes

I thin you should give minimum 3 minutes checkpoint interval for all

Coming to (2)

What's your input data rate?
For example you are seeing data at 100 msg/sec, For each message if 
there is state changing and you are updating the state with RocksDB, 
it's going to
create 100 rows in 1 second at RocksDb end, On the average if 50 records 
have changed each second, even if you are using RocksDB 
differentialstate = true,
there is no use. Because everytime 50% is new rows getting added. So the 
best bet is to update records with RocksDB only once in your checkpoint 
interval.
Suppose your checkpoint interval is 5 minutes. If you update RocksDB 
state once in 5 minutes, then the rate at which new records added to 
RocksDB  will be 1 record/5min.
Whereas in your original scenario, 3 records added to rocksDB in 5 
min. You can save 1:3 ratio of records in addition to RocksDB. Which 
will save a huge
redundant size addition to RocksDB. Ultimately your  state is driven by 
your checkpoint interval. From the input source you will go back 5 min 
back and read the state, similarly from RocksDB side
also you can have a state update once in 5 min should work. Otherwise 
even if you add state there is no use.


Regards
Bhaskar

Try to update your RocksDB state in an interval equal to the checkpoint 
interval. Otherwise in my case many times what's observed is

state size grows unnecessarily.

On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson <mailto:jehenri...@gmail.com>> wrote:


Vijay,

Thanks for your thoughts.  Below are answers to 

Re: Trouble with large state

2020-06-18 Thread Jeff Henrikson

Vijay,

Thanks for your thoughts.  Below are answers to your questions.

> 1. What's your checkpoint interval?

I have used many different checkpoint intervals, ranging from 5 minutes 
to never.  I usually setMinPasueBetweenCheckpoints to the same value as 
the checkpoint interval.


> 2. How frequently are you updating the state into RocksDB?

My understanding is that for .cogroup:

  - Triggers control communication outside the operator
  - Evictors control cleanup of internal state
  - Configurations like write buffer size control the frequency of 
state change at the storage layer
  - There is no control for how frequently the window state updates at 
the layer of the RocksDB api layer.


Thus, the state update whenever data is ingested.

> 3. How many task managers are you using?

Usually I have been running with one slot per taskmanager.  28GB of 
usable ram on each node.


> 4. How much data each task manager handles while taking the checkpoint?

Funny you should ask.  I would be okay with zero.

The application I am replacing has a latency of 36-48 hours, so if I had 
to fully stop processing to take every snapshot synchronously, it might 
be seen as totally acceptable, especially for initial bootstrap.  Also, 
the velocity of running this backfill is approximately 115x real time on 
8 nodes, so the steady-state run may not exhibit the failure mode in 
question at all.


It has come as some frustration to me that, in the case of 
RocksDBStateBackend, the configuration key state.backend.async 
effectively has no meaningful way to be false.


The only way I have found in the existing code to get a behavior like 
synchronous snapshot is to POST to /jobs//stop with drain=false 
and a URL.  This method of failing fast is the way that I discovered 
that I needed to increase transfer threads from the default.


The reason I don't just run the whole backfill and then take one 
snapshot is that even in the absence of checkpoints, a very similar 
congestion seems to take the cluster down when I am say 20-30% of the 
way through my backfill.


Reloading from my largest feasible snapshot makes it possible to make 
another snapshot a bit larger before crash, but not by much.


On first glance, the code change to allow RocksDBStateBackend into a 
synchronous snapshots mode looks pretty easy.  Nevertheless, I was 
hoping to do the initial launch of my application without needing to 
modify the framework.


Regards,


Jeff Henrikson


On 6/18/20 7:28 AM, Vijay Bhaskar wrote:

For me this seems to be an IO bottleneck at your task manager.
I have a couple of queries:
1. What's your checkpoint interval?
2. How frequently are you updating the state into RocksDB?
3. How many task managers are you using?
4. How much data each task manager handles while taking the checkpoint?

For points (3) and (4) , you should be very careful. I feel you are 
stuck at this.
You try to scale vertically by increasing more CPU and memory for each 
task manager.

If not, try to scale horizontally so that each task manager IO gets reduces
Apart from that check is there any bottleneck with the file system.

Regards
Bhaskar





On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor <mailto:vict...@gmail.com>> wrote:


I had a similar problem.   I ended up solving by not relying on
checkpoints for recovery and instead re-read my input sources (in my
case a kafka topic) from the earliest offset and rebuilding only the
state I need.  I only need to care about the past 1 to 2 days of
state so can afford to drop anything older.   My recovery time went
from over an hour for just the first checkpoint to under 10 minutes.

Tim

On Wed, Jun 17, 2020, 11:52 PM Yun Tang mailto:myas...@live.com>> wrote:

Hi Jeff

 1. "after around 50GB of state, I stop being able to reliably
take checkpoints or savepoints. "
What is the exact reason that job cannot complete
checkpoint? Expired before completing or decline by some
tasks? The former one is manly caused by high back-pressure
and the later one is mainly due to some internal error.
 2. Have you checked what reason the remote task manager is lost?
If the remote task manager is not crashed, it might be due
to GC impact, I think you might need to check task-manager
logs and GC logs.

Best
Yun Tang

*From:* Jeff Henrikson mailto:jehenri...@gmail.com>>
*Sent:* Thursday, June 18, 2020 1:46
*To:* user mailto:user@flink.apache.org>>
*Subject:* Trouble with large state
Hello Flink users,

I have an application of around 10 enrichment joins.  All events
are
read from kafka and have event timestamps.  The joins are built
using
.cogroup, with a

Re: Trouble with large state

2020-06-18 Thread Jeff Henrikson

Hi Yun,

Thanks for your thoughts.  Answers to your questions:

>  1. "after around 50GB of state, I stop being able to reliably take
> checkpoints or savepoints. "
> What is the exact reason that job cannot complete checkpoint?
> Expired before completing or decline by some tasks? The former one
> is manly caused by high back-pressure and the later one is mainly
> due to some internal error.

In the UI, under Job | Checkpoints | History, then opening the 
checkpoint detail, the checkpoints fail by not some operators not 
acknowledging.  It's always a subset of of the larger state operators 
that stop acknowledging.  The exact selection of operators that stop is 
nondeterministic.  The checkpoints frequently fail before any timeout 
that I impose on them.


>  2. Have you checked what reason the remote task manager is lost?
> If the remote task manager is not crashed, it might be due to GC
> impact, I think you might need to check task-manager logs and GC 
logs.


The only general pattern I have observed is:

1) Some taskmanager A throws one of the various connectivity
exceptions I listed complaining about another taskmanager B.
2) Taskmanager B shows no obvious error other than complaining
that taskmanager A has disconnected from it.

Regards,


Jeff Henrikson



On 6/17/20 9:52 PM, Yun Tang wrote:

Hi Jeff

 1. "after around 50GB of state, I stop being able to reliably take
checkpoints or savepoints. "
What is the exact reason that job cannot complete checkpoint?
Expired before completing or decline by some tasks? The former one
is manly caused by high back-pressure and the later one is mainly
due to some internal error.
 2. Have you checked what reason the remote task manager is lost?
If the remote task manager is not crashed, it might be due to GC
impact, I think you might need to check task-manager logs and GC logs.

Best
Yun Tang
----
*From:* Jeff Henrikson 
*Sent:* Thursday, June 18, 2020 1:46
*To:* user 
*Subject:* Trouble with large state
Hello Flink users,

I have an application of around 10 enrichment joins.  All events are
read from kafka and have event timestamps.  The joins are built using
.cogroup, with a global window, triggering on every 1 event, plus a
custom evictor that drops records once a newer record for the same ID
has been processed.  Deletes are represented by empty events with
timestamp and ID (tombstones). That way, we can drop records when
business logic dictates, as opposed to when a maximum retention has been
attained.  The application runs RocksDBStateBackend, on Kubernetes on
AWS with local SSDs.

Unit tests show that the joins produce expected results.  On an 8 node
cluster, watermark output progress seems to indicate I should be able to
bootstrap my state of around 500GB in around 1 day.  I am able to save
and restore savepoints for the first half an hour of run time.

My current trouble is that after around 50GB of state, I stop being able
to reliably take checkpoints or savepoints.  Some time after that, I
start getting a variety of failures where the first suspicious log event
is a generic cluster connectivity error, such as:

  1) java.io.IOException: Connecting the channel failed: Connecting
  to remote task manager + '/10.67.7.101:38955' has failed. This
  might indicate that the remote task manager has been lost.

  2) org.apache.flink.runtime.io.network.netty.exception
  .RemoteTransportException: Connection unexpectedly closed by remote
  task manager 'null'. This might indicate that the remote task
  manager was lost.

  3) Association with remote system
  [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
  gated for [50] ms. Reason: [Association failed with
  [akka.tcp://flink@10.67.6.66:34987]] Caused by:
  [java.net.NoRouteToHostException: No route to host]

I don't see any obvious out of memory errors on the TaskManager UI.

Adding nodes to the cluster does not seem to increase the maximum
savable state size.

I could enable HA, but for the time being I have been leaving it out to
avoid the possibility of masking deterministic faults.

Below are my configurations.

Thanks in advance for any advice.

Regards,


Jeff Henrikson



Flink version: 1.10

Configuration set via code:
  parallelism=8
  maxParallelism=64
  setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
  setTolerableCheckpointFailureNumber(1000)
  setMaxConcurrentCheckpoints(1)

enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  RocksDBStateBackend
  setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
  setNumberOfTransferThreads(25)
  setDbStoragePath points to a local nvme SSD

Configuration in flink-conf

Trouble with large state

2020-06-17 Thread Jeff Henrikson

Hello Flink users,

I have an application of around 10 enrichment joins.  All events are 
read from kafka and have event timestamps.  The joins are built using 
.cogroup, with a global window, triggering on every 1 event, plus a 
custom evictor that drops records once a newer record for the same ID 
has been processed.  Deletes are represented by empty events with 
timestamp and ID (tombstones). That way, we can drop records when 
business logic dictates, as opposed to when a maximum retention has been 
attained.  The application runs RocksDBStateBackend, on Kubernetes on 
AWS with local SSDs.


Unit tests show that the joins produce expected results.  On an 8 node 
cluster, watermark output progress seems to indicate I should be able to 
bootstrap my state of around 500GB in around 1 day.  I am able to save 
and restore savepoints for the first half an hour of run time.


My current trouble is that after around 50GB of state, I stop being able 
to reliably take checkpoints or savepoints.  Some time after that, I 
start getting a variety of failures where the first suspicious log event 
is a generic cluster connectivity error, such as:


1) java.io.IOException: Connecting the channel failed: Connecting
to remote task manager + '/10.67.7.101:38955' has failed. This
might indicate that the remote task manager has been lost.

2) org.apache.flink.runtime.io.network.netty.exception
.RemoteTransportException: Connection unexpectedly closed by remote
task manager 'null'. This might indicate that the remote task
manager was lost.

3) Association with remote system
[akka.tcp://flink@10.67.6.66:34987] has failed, address is now
gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@10.67.6.66:34987]] Caused by:
[java.net.NoRouteToHostException: No route to host]

I don't see any obvious out of memory errors on the TaskManager UI.

Adding nodes to the cluster does not seem to increase the maximum 
savable state size.


I could enable HA, but for the time being I have been leaving it out to 
avoid the possibility of masking deterministic faults.


Below are my configurations.

Thanks in advance for any advice.

Regards,


Jeff Henrikson



Flink version: 1.10

Configuration set via code:
parallelism=8
maxParallelism=64
setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
setTolerableCheckpointFailureNumber(1000)
setMaxConcurrentCheckpoints(1)

enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
RocksDBStateBackend
setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
setNumberOfTransferThreads(25)
setDbStoragePath points to a local nvme SSD

Configuration in flink-conf.yaml:

jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 28000m
taskmanager.memory.process.size: 28000m
taskmanager.memory.jvm-metaspace.size: 512m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: full

cluster.evenly-spread-out-slots: false

taskmanager.memory.network.fraction: 0.2   # default 0.1
taskmanager.memory.framework.off-heap.size: 2GB
taskmanager.memory.task.off-heap.size: 2GB
taskmanager.network.memory.buffers-per-channel: 32 # default 2
taskmanager.memory.managed.fraction: 0.4   # docs say 
default 0.1, but something seems to set 0.4

taskmanager.memory.task.off-heap.size: 2048MB  # default 128M

state.backend.fs.memory-threshold: 1048576
state.backend.fs.write-buffer-size: 1024
state.backend.local-recovery: true
state.backend.rocksdb.writebuffer.size: 64MB
state.backend.rocksdb.writebuffer.count: 8
state.backend.rocksdb.writebuffer.number-to-merge: 4
state.backend.rocksdb.timer-service.factory: heap
state.backend.rocksdb.block.cache-size: 6400 # default 8MB
state.backend.rocksdb.write-batch-size: 1600 # default 2MB

web.checkpoints.history: 250


Re: flink-s3-fs-hadoop retry configuration

2020-06-17 Thread Jeff Henrikson

Robert,

Thanks for the tip!

Before you replied, I did figure out to put the keys in flink-conf.yaml, 
using btrace.  I instrumented the methods 
org.apache.hadoop.conf.Configuration.get for the keys, and 
org.apache.hadoop.conf.Configuration.substituteVars for effective 
values.  (There is a btrace bug where you can't just observe the return 
value from .get directly.)


I did not see in the code any way to observe the effective configuration 
using logging.


Regards,


Jeff



On 5/8/20 7:29 AM, Robert Metzger wrote:

I validated my assumption. Putting

s3.connection.maximum: 123456

into the flink-conf.yaml file results in the following DEBUG log output:

2020-05-08 16:20:47,461 DEBUG 
org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding 
Flink config entry for s3.connection.maximum as 
fs.s3a.connection.maximum to Hadoop config


I guess that is the recommended way of passing configuration into the S3 
connectors of Flink.


You also asked how to detect retries: DEBUG-log level is helpful again. 
I just tried connecting against an invalid port, and got these messages:


2020-05-08 16:26:37,671 DEBUG 
org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] - 
http-outgoing-7: Shutdown connection
2020-05-08 16:26:37,671 DEBUG 
org.apache.http.impl.execchain.MainClientExec                [] - 
Connection discarded
2020-05-08 16:26:37,671 DEBUG 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - 
Connection released: [id: 7][route: {}->http://127.0.0.1:9000][total 
kept alive: 0; route allocated: 0 of 123456; total allocated: 0 of 123456]
2020-05-08 16:26:37,671 DEBUG com.amazonaws.request 
                    [] - Retrying Request: HEAD http://127.0.0.1:9000 
/test/ Headers: (User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271 
Mac_OS_X/10.15.3 OpenJDK_64-Bit_Server_VM/25.252-b09 java/1.8.0_252 
scala/2.11.12, amz-sdk-invocation-id: 
051f9877-1c22-00ed-ad26-8361bcf14b98, Content-Type: 
application/octet-stream, )
2020-05-08 16:26:37,671 DEBUG com.amazonaws.http.AmazonHttpClient   
                    [] - Retriable error detected, will retry in 4226ms, 
attempt number: 7



maybe it makes sense to set the log level only for 
"com.amazonaws.http.AmazonHttpClient" to DEBUG.


How to configure the log level depends on the deployment method. 
Usually, its done by replacing the first INFO with DEBUG in 
conf/log4j.properties. ("rootLogger.level = DEBUG")



Best,
Robert

On Fri, May 8, 2020 at 3:51 PM Robert Metzger <mailto:rmetz...@apache.org>> wrote:


Hey Jeff,

Which Flink version are you using?
Have you tried configuring the S3 filesystem via Flink's  config
yaml? Afaik all config parameters prefixed with "s3." are mirrored
into the Hadoop file system connector.


On Mon, May 4, 2020 at 8:45 PM Jeff Henrikson mailto:jehenri...@gmail.com>> wrote:

  > 2) How can I tell if flink-s3-fs-hadoop is actually managing
to pick up
  > the hadoop configuration I have provided, as opposed to some
separate
  > default configuration?

I'm reading the docs and source of flink-fs-hadoop-shaded.  I
see that
core-default-shaded.xml has fs.s3a.connection.maximum set to
15.  I have
around 20 different DataStreams being instantiated from S3, so
if they
each require one connection to be healthy, then 15 is definitely
not a
good value.

However, I seem to be unable to override
fs.s3a.connection.maximum using
my core-site.xml.  I am also unable to see the DEBUG level
messages for
the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.

So now I'm wondering:

      1) Anybody know how to see DEBUG output for
flink-fs-hadoop-shaded?

      2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
      override the config?


Thanks in advance,


Jeff Henrikson




https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded


https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml

    
      fs.s3a.connection.maximum
      15
      Controls the maximum number of simultaneous
connections to S3.
    




On 5/1/20 7:30 PM, Jeff Henrikson wrote:
 > Hello Flink users,
 >
 > I could use help with three related questions:
 >
 > 1) How can I observe retries in the flink-s3-fs-hadoop connector?
 >
 > 2) How can I tell if flink-s3-fs-hadoop is actually managing
to pick up
 > the hadoop configuration I have provided, as opposed to some
separate
 > default configuration?  My job fails quickly w

Re: pyflink数据查询

2020-06-09 Thread Jeff Zhang
可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475



jack  于2020年6月9日周二 下午5:28写道:

> 问题请教:
> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>
> flink能否实现这样的方式?
> 感谢
>


-- 
Best Regards

Jeff Zhang


Re: pyflink数据查询

2020-06-09 Thread Jeff Zhang
可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475



jack  于2020年6月9日周二 下午5:28写道:

> 问题请教:
> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>
> flink能否实现这样的方式?
> 感谢
>


-- 
Best Regards

Jeff Zhang


Re: Run command after Batch is finished

2020-06-06 Thread Jeff Zhang
It would run in the client side where ExecutionEnvironment is created.

Mark Davis  于2020年6月6日周六 下午8:14写道:

> Hi Jeff,
>
> Thank you very much! That is exactly what I need.
>
> Where the listener code will run in the cluster deployment(YARN, k8s)?
> Will it be sent over the network?
>
> Thank you!
>
>   Mark
>
> ‐‐‐ Original Message ‐‐‐
> On Friday, June 5, 2020 6:13 PM, Jeff Zhang  wrote:
>
> You can try JobListener which you can register to ExecutionEnvironment.
>
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
>
>
> Mark Davis  于2020年6月6日周六 上午12:00写道:
>
>> Hi there,
>>
>> I am running a Batch job with several outputs.
>> Is there a way to run some code(e.g. release a distributed lock) after
>> all outputs are finished?
>>
>> Currently I do this in a try-finally block around
>> ExecutionEnvironment.execute() call, but I have to switch to the detached
>> execution mode - in this mode the finally block is never run.
>>
>> Thank you!
>>
>>   Mark
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Run command after Batch is finished

2020-06-05 Thread Jeff Zhang
You can try JobListener which you can register to ExecutionEnvironment.

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java


Mark Davis  于2020年6月6日周六 上午12:00写道:

> Hi there,
>
> I am running a Batch job with several outputs.
> Is there a way to run some code(e.g. release a distributed lock) after all
> outputs are finished?
>
> Currently I do this in a try-finally block around
> ExecutionEnvironment.execute() call, but I have to switch to the detached
> execution mode - in this mode the finally block is never run.
>
> Thank you!
>
>   Mark
>


-- 
Best Regards

Jeff Zhang


Re: Table Environment for Remote Execution

2020-06-03 Thread Jeff Zhang
Hi Satyam,

I also meet the same issue when I integrate flink with zeppelin. Here's
what I did.

https://github.com/apache/zeppelin/blob/master/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java#L226

If you are interested in flink on zeppelin, you can refer the following
blogs and videos.

Flink on Zeppelin video
https://www.youtube.com/watch?v=YxPo0Fosjjg=PL4oy12nnS7FFtg3KV1iS5vDb0pTz12VcX

Flink on Zeppelin tutorial blogs: 1) Get started
https://link.medium.com/oppqD6dIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2FoppqD6dIg5_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy=YxPo0Fosjjg=video_description>
2) Batch https://link.medium.com/3qumbwRIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2F3qumbwRIg5_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy=YxPo0Fosjjg=video_description>
3) Streaming https://link.medium.com/RBHa2lTIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2FRBHa2lTIg5_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy=YxPo0Fosjjg=video_description>
4) Advanced usage https://link.medium.com/CAekyoXIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2FCAekyoXIg5_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy=YxPo0Fosjjg=video_description>



Satyam Shekhar  于2020年6月4日周四 上午2:27写道:

>
> Thanks, Jark & Godfrey.
>
> The workaround was successful.
>
> I have created the following ticket to track the issue -
> https://issues.apache.org/jira/browse/FLINK-18095
>
> Regards,
> Satyam
>
> On Wed, Jun 3, 2020 at 3:26 AM Jark Wu  wrote:
>
>> Hi Satyam,
>>
>> In the long term, TableEnvironment is the entry point for pure Table/SQL
>> users. So it should have all the ability of StreamExecutionEnvironment.
>> I think remote execution is a reasonable feature, could you create an
>> JIRA issue for this?
>>
>> As a workaround, you can construct `StreamTableEnvironmentImpl` by
>> yourself via constructor, it can support batch mode
>> and StreamExecutionEnvironment.
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 3 Jun 2020 at 16:35, Satyam Shekhar 
>> wrote:
>>
>>> Thanks for the reply, Godfrey.
>>>
>>> I would also love to learn the reasoning behind that limitation.
>>>
>>> For more context, I am building a Java application that receives some
>>> user input via a GRPC service. The user's input is translated to some SQL
>>> that may be executed in streaming or batch mode based on custom business
>>> logic and submitted it to Flink for execution. In my current setup, I
>>> create an ExecutionEnvironment, register sources, and execute the
>>> corresponding SQL. I was able to achieve the desired behavior with
>>> StreamTableEnvironment but it has limitations around supported SQL in batch
>>> mode.
>>>
>>> While invoking the CLI from java program might be a solution, it doesn't
>>> feel like the most natural solution for the problem. Are there other ways
>>> to address this?
>>>
>>> Regards,
>>> Satyam
>>>
>>> On Wed, Jun 3, 2020 at 12:50 AM godfrey he  wrote:
>>>
>>>> Hi Satyam,
>>>>
>>>> for blink batch mode, only TableEnvironment can be used,
>>>> and TableEnvironment do not take StreamExecutionEnvironment as argument.
>>>> Instead StreamExecutionEnvironment instance is created internally.
>>>>
>>>> back to your requirement, you can build your table program as user jar,
>>>> and submit the job through flink cli [1] to remote environment.
>>>>
>>>> Bests,
>>>> Godfrey
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
>>>>
>>>>
>>>>
>>>> Satyam Shekhar  于2020年6月3日周三 下午2:59写道:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am running into a very basic problem while working with Table API. I
>>>>> wish to create a TableEnvironment connected to a remote environment that
>>>>> uses Blink planner in batch mode. Examples and documentation I have come
>>>>> across so far recommend the following pattern to create such an 
>>>>> environment
>>>>> -
>>>>>
>>>>> var settings = EnvironmentSettings.newInstance()
>>>>>   .useBlinkPlanner()
>>>>>   .inBatchMode()
>>>>>   .build();
>>>>> var tEnv = TableEnvironment.create(settings);
>>>>>
>>>>> The above configuration, however, does not connect to a remote
>>>>> environment. Tracing code in TableEnvironment.java, I see the
>>>>> following method in BlinkExecutorFactory.java that appears to
>>>>> relevant -
>>>>>
>>>>> Executor create(Map, StreamExecutionEnvironment);
>>>>>
>>>>> However, it seems to be only accessible through the Scala bridge. I
>>>>> can't seem to find a way to instantiate a TableEnvironment that takes
>>>>> StreamExecutionEnvironment as an argument. How do I achieve that?
>>>>>
>>>>> Regards,
>>>>> Satyam
>>>>>
>>>>

-- 
Best Regards

Jeff Zhang


Re: 關於LocalExecutionEnvironment使用MiniCluster的配置

2020-05-24 Thread Jeff Zhang
在zeppelin也集成了flink的local 模式,可以通过设置 local.number-taskmanager 和
flink.tm.slot来设置tm和slot的数目,
具体可以参考这个视频 https://www.bilibili.com/video/BV1Te411W73b?p=3

tison  于2020年5月24日周日 下午9:46写道:

> 是这样的。
>
> 这里的配置可以参考[1][2]两个类,具体你 Maven 启动的代码路径还跟[3][4]有关。
>
> 这边可能确实文档比较缺失。可以看下配置传递的路径,TM 的数量还有 RPC 的共享格式等配置,至少编程接口上都是可以配的。
>
> Best,
> tison.
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
> [2]
>
> https://github.com/apache/flink/blob/ab947386ed93b16019f36c50e9a3475dd6ad3c4a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
> [3]
>
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
> [4]
>
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
>
>
>
>
> 月月  于2020年5月24日周日 下午9:11写道:
>
> > 您好,
> > 在單機模式使用maven執行專案時,會自動啟動MiniCluster,
> > 我想請問在這種情形下,預設是配置一個JobManager以及一個TaskManager嗎?
> >
> > 找了一下文件中並沒有相關的說明。
> >
> > 感謝!
> >
>


-- 
Best Regards

Jeff Zhang


Re: 按照官网进行flink-shell操作,出现无法解决的错误:Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment.

2020-05-21 Thread Jeff Zhang
可以在zeppelin里写scala代码,是支持hive的,参考这个视频,
https://www.bilibili.com/video/BV1Te411W73b?p=10

也可以到这个钉钉群讨论: 30022475

Jingsong Li  于2020年5月21日周四 下午4:43写道:

> Hi,
>
> 不好意思,现在版本hive connector已经不支持old planner了,
> 但是scala shell还是默认old planner。
>
> Best,
> Jingsong Lee
>
> On Thu, May 21, 2020 at 3:24 PM 张锴  wrote:
>
> > 具体操作及错误信息我贴到下面,各位大佬帮忙看下如何解决,不知道是不是BUG。
> >
> > scala> import org.apache.flink.table.catalog.hive.HiveCatalog
> > import org.apache.flink.table.catalog.hive.HiveCatalog
> >
> > scala> val hiveCatalog = new HiveCatalog("hive", "mydatabase",
> > "/opt/hive2.3.3/conf", "2.3.3");
> > hiveCatalog: org.apache.flink.table.catalog.hive.HiveCatalog =
> > org.apache.flink.table.catalog.hive.HiveCatalog@193f3306
> >
> > scala> btenv.registerCatalog("hive", hiveCatalog)
> > Thu May 21 15:10:05 CST 2020 WARN: Establishing SSL connection without
> > server's identity verification is not recommended. According to MySQL
> > 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be
> established
> > by default if explicit option isn't set. For compliance with existing
> > applications not using SSL the verifyServerCertificate property is set to
> > 'false'. You need either to explicitly disable SSL by setting
> useSSL=false,
> > or set useSSL=true and provide truststore for server certificate
> > verification.
> > Thu May 21 15:10:05 CST 2020 WARN: Establishing SSL connection without
> > server's identity verification is not recommended. According to MySQL
> > 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be
> established
> > by default if explicit option isn't set. For compliance with existing
> > applications not using SSL the verifyServerCertificate property is set to
> > 'false'. You need either to explicitly disable SSL by setting
> useSSL=false,
> > or set useSSL=true and provide truststore for server certificate
> > verification.
> >
> > scala> btenv.useCatalog("hive")
> >
> > scala> btenv.listTables
> > res2: Array[String] = Array(mytable)
> >
> > scala> btenv.sqlQuery("select * from mytable").toDataSet[Row].print()
> > org.apache.flink.table.api.TableException: Only BatchTableSource and
> > InputFormatTableSource are supported in BatchTableEnvironment.
> >   at
> >
> >
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:118)
> >   at
> >
> >
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)
> >   at
> >
> >
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)
> >   at
> >
> >
> org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:69)
> >   at
> >
> >
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:53)
> >   ... 30 elided
> >
>
>
> --
> Best, Jingsong Lee
>


-- 
Best Regards

Jeff Zhang


Re:Re: flink1.10怎么获得flink-shaded-hadoop包以支持hadoop3.2.1?

2020-05-20 Thread Jeff
好的,刚刚也查了其它资料,flink现在还不支持hadoop3,但用2.X的包也是可以的,只要不用到hadoop3特有API就行了

















在 2020-05-20 09:22:39,"刘大龙"  写道:
>Hi,
>你可以看一下这两个链接:
>1: https://www.mail-archive.com/dev@flink.apache.org/msg37293.html
>2: https://issues.apache.org/jira/browse/FLINK-11086
>> -原始邮件-
>> 发件人: Jeff 
>> 发送时间: 2020-05-20 10:09:10 (星期三)
>> 收件人: flink-zh 
>> 抄送: 
>> 主题: flink1.10怎么获得flink-shaded-hadoop包以支持hadoop3.2.1?
>> 
>> hi all,
>> 在mvnrepository里没找到支持hadoop3.2.1的flink-shaded-hadoop包, 
>> 在单独的flink-shaded项目里也没找到相应hadoop模块,请问我要怎么获得这个包呢?
>
>
>--
>刘大龙
>
>浙江大学 控制系 智能系统与控制研究所 工控新楼217
>地址:浙江省杭州市浙大路38号浙江大学玉泉校区
>Tel:18867547281


flink1.10怎么获得flink-shaded-hadoop包以支持hadoop3.2.1?

2020-05-19 Thread Jeff
hi all,
在mvnrepository里没找到支持hadoop3.2.1的flink-shaded-hadoop包, 
在单独的flink-shaded项目里也没找到相应hadoop模块,请问我要怎么获得这个包呢?

Re:Re: 有什么方式可以获得各个task占用内存情况呢

2020-05-15 Thread Jeff



好的,谢谢














在 2020-05-15 14:48:15,"Xintong Song"  写道:
>Hi Jeff,
>
>Flink 目前没有 task 级别的内存统计。原因是运行在同一个 JVM
>进程中的不同线程的内存开销,是很难区分开的。如果真要逐个线程进行内存分析,代价会比较高,不适合在运行时进行统计。如果需要对 task
>的内存开销进行深入分析的话,可能需要借助一些 profiling 工具对某一时刻的 heap dump 进行分析。
>
>Thank you~
>
>Xintong Song
>
>
>
>On Fri, May 15, 2020 at 2:52 PM Jeff  wrote:
>
>> hi all,
>>
>>
>> 我用per-job方式提交了一批任务,请问有什么方式知道每个任务内存消耗情况呢?


Re:回复:怎么排查taskmanager频繁挂掉的原因?

2020-05-15 Thread Jeff



不是,是用per-job方式提交的














在 2020-05-15 14:14:20,"shao.hongxiao" <17611022...@163.com> 写道:
>你的是batch 模式吗
>
>
>
>
>| |
>邵红晓
>|
>|
>邮箱:17611022...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2020年05月15日 15:05,Jeff 写道:
>hi all,
>最近上线一批新任务后taskmanager频繁挂掉,很可能是OOM问题,操作系统日志里没找到相关记录,flink日志只找到如下部分,但还是不确定是什么原因,请问要怎么确定原因呢?
>
>
>
>
>id, channel, rowtime) -> select: (appid, channel, rowtime, 1 AS $f3) 
>b91d36766995398a9b0c9416ac1fb6bc.
>2020-05-14 08:55:30,504 ERROR org.apache.flink.runtime.taskmanager.Task - Task 
>did not exit gracefully within 180 + seconds.
>2020-05-14 08:55:30,505 ERROR 
>org.apache.flink.runtime.taskexecutor.TaskExecutor - Task did not exit 
>gracefully within 180 + seconds.
>2020-05-14 08:55:30,505 ERROR 
>org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Fatal error occurred 
>while executing the TaskManager. Shutting it down...
>2020-05-14 08:55:30,505 INFO 
>org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopping TaskExecutor 
>akka.tcp://flink@172.24.150.24:48412/user/taskmanager_0.
>2020-05-14 08:55:30,508 INFO 
>org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader 
>service.
>2020-05-14 08:55:30,510 INFO 
>org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Shutting 
>down TaskExecutorLocalStateStoresManager.
>2020-05-14 08:55:30,512 INFO 
>org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed 
>spill file directory /tmp/flink-io-c882cdf4-840c-4c62-800e-0ca3226be20b
>2020-05-14 08:55:30,512 INFO 
>org.apache.flink.runtime.io.network.NetworkEnvironment - Shutting down the 
>network environment and its components.
>2020-05-14 08:55:30,514 INFO 
>org.apache.flink.runtime.io.network.netty.NettyClient - Successful shutdown 
>(took 2 ms).
>2020-05-14 08:55:30,517 INFO 
>org.apache.flink.runtime.io.network.netty.NettyServer - Successful shutdown 
>(took 2 ms).
>2020-05-14 08:55:30,545 INFO 
>org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader 
>service.
>2020-05-14 08:55:30,546 INFO org.apache.flink.runtime.filecache.FileCache - 
>removed file cache directory 
>/tmp/flink-dist-cache-3e0d4351-d5aa-4358-a028-9a59f398d92f
>2020-05-14 08:55:30,550 INFO 
>org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor 
>akka.tcp://flink@172.24.150.24:48412/user/taskmanager_0.
>2020-05-14 08:55:30,552 INFO org.apache.flink.runtime.blob.PermanentBlobCache 
>- Shutting down BLOB cache
>2020-05-14 08:55:30,554 INFO org.apache.flink.runtime.blob.TransientBlobCache 
>- Shutting down BLOB cache
>2020-05-14 08:55:30,563 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService 
>- Stopping Akka RPC service.
>2020-05-14 08:55:30,566 INFO 
>akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote 
>daemon.
>2020-05-14 08:55:30,567 INFO 
>akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut 
>down; proceeding with flushing remote transports.
>2020-05-14 08:55:30,570 INFO 
>akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote 
>daemon.
>2020-05-14 08:55:30,571 INFO 
>akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut 
>down; proceeding with flushing remote transports.
>2020-05-14 08:55:30,571 INFO org.apache.flink.runtime.taskmanager.Task - 
>Source: KafkaTableSource switched from RUNNING to FAILED.
>java.lang.RuntimeException: segment has been freed
>at 
>org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>at 
>org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>at 
>org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>at 
>org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>at 
>org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>at 
>org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>at 
>org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>at 
>org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>at DataStreamCalcRule$2658.processElement(Unknown Source)
>at 
>org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
>at 
>org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
>at 
>org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>at 
>org.

怎么排查taskmanager频繁挂掉的原因?

2020-05-15 Thread Jeff
hi all,
最近上线一批新任务后taskmanager频繁挂掉,很可能是OOM问题,操作系统日志里没找到相关记录,flink日志只找到如下部分,但还是不确定是什么原因,请问要怎么确定原因呢?




id, channel, rowtime) -> select: (appid, channel, rowtime, 1 AS $f3) 
b91d36766995398a9b0c9416ac1fb6bc.
2020-05-14 08:55:30,504 ERROR org.apache.flink.runtime.taskmanager.Task - Task 
did not exit gracefully within 180 + seconds.
2020-05-14 08:55:30,505 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Task did not exit 
gracefully within 180 + seconds.
2020-05-14 08:55:30,505 ERROR 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Fatal error occurred 
while executing the TaskManager. Shutting it down...
2020-05-14 08:55:30,505 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
- Stopping TaskExecutor akka.tcp://flink@172.24.150.24:48412/user/taskmanager_0.
2020-05-14 08:55:30,508 INFO 
org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader 
service.
2020-05-14 08:55:30,510 INFO 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Shutting 
down TaskExecutorLocalStateStoresManager.
2020-05-14 08:55:30,512 INFO 
org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed 
spill file directory /tmp/flink-io-c882cdf4-840c-4c62-800e-0ca3226be20b
2020-05-14 08:55:30,512 INFO 
org.apache.flink.runtime.io.network.NetworkEnvironment - Shutting down the 
network environment and its components.
2020-05-14 08:55:30,514 INFO 
org.apache.flink.runtime.io.network.netty.NettyClient - Successful shutdown 
(took 2 ms).
2020-05-14 08:55:30,517 INFO 
org.apache.flink.runtime.io.network.netty.NettyServer - Successful shutdown 
(took 2 ms).
2020-05-14 08:55:30,545 INFO 
org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader 
service.
2020-05-14 08:55:30,546 INFO org.apache.flink.runtime.filecache.FileCache - 
removed file cache directory 
/tmp/flink-dist-cache-3e0d4351-d5aa-4358-a028-9a59f398d92f
2020-05-14 08:55:30,550 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
- Stopped TaskExecutor akka.tcp://flink@172.24.150.24:48412/user/taskmanager_0.
2020-05-14 08:55:30,552 INFO org.apache.flink.runtime.blob.PermanentBlobCache - 
Shutting down BLOB cache
2020-05-14 08:55:30,554 INFO org.apache.flink.runtime.blob.TransientBlobCache - 
Shutting down BLOB cache
2020-05-14 08:55:30,563 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - 
Stopping Akka RPC service.
2020-05-14 08:55:30,566 INFO 
akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote 
daemon.
2020-05-14 08:55:30,567 INFO 
akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut 
down; proceeding with flushing remote transports.
2020-05-14 08:55:30,570 INFO 
akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote 
daemon.
2020-05-14 08:55:30,571 INFO 
akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut 
down; proceeding with flushing remote transports.
2020-05-14 08:55:30,571 INFO org.apache.flink.runtime.taskmanager.Task - 
Source: KafkaTableSource switched from RUNNING to FAILED.
java.lang.RuntimeException: segment has been freed
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamCalcRule$2658.processElement(Unknown Source)
at 
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
at 
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at 

有什么方式可以获得各个task占用内存情况呢

2020-05-15 Thread Jeff
hi all,


我用per-job方式提交了一批任务,请问有什么方式知道每个任务内存消耗情况呢?

Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Jeff Zhang
I use JobListener#onJobExecuted to be notified that the flink job is done.
It is pretty reliable for me, the only exception is the client process is
down.

BTW, the reason you see ApplicationNotFound exception is that yarn app is
terminated which means the flink cluster is shutdown. While for standalone
mode, the flink cluster is always up.


Caizhi Weng  于2020年5月8日周五 下午2:47写道:

> Hi dear Flink community,
>
> I would like to determine whether a job has finished (no matter
> successfully or exceptionally) in my code.
>
> I used to think that JobClient#getJobStatus is a good idea, but I found
> that it behaves quite differently under different executing environments.
> For example, under a standalone session cluster it will return the FINISHED
> status for a finished job, while under a yarn per job cluster it will throw
> a ApplicationNotFound exception. I'm afraid that there might be other
> behaviors for other environments.
>
> So what's the best practice to determine whether a job has finished or
> not? Note that I'm not waiting for the job to finish. If the job hasn't
> finished I would like to know it and do something else.
>


-- 
Best Regards

Jeff Zhang


Overriding hadoop core-site.xml keys using the flink-fs-hadoop-shaded assemblies

2020-05-05 Thread Jeff Henrikson
Has anyone had success overriding hadoop core-site.xml keys using the 
flink-fs-hadoop-shaded assemblies?  If so, what versions were known to work?


Using btrace, I am seeing a bug in the hadoop shaded dependencies 
distributed with 1.10.0.  Some (but not all) of the core-site.xml keys 
cannot be overridden.


Thanks,


Jeff Henrikson


Re: flink-s3-fs-hadoop retry configuration

2020-05-04 Thread Jeff Henrikson

> 2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
> the hadoop configuration I have provided, as opposed to some separate
> default configuration?

I'm reading the docs and source of flink-fs-hadoop-shaded.  I see that 
core-default-shaded.xml has fs.s3a.connection.maximum set to 15.  I have 
around 20 different DataStreams being instantiated from S3, so if they 
each require one connection to be healthy, then 15 is definitely not a 
good value.


However, I seem to be unable to override fs.s3a.connection.maximum using 
my core-site.xml.  I am also unable to see the DEBUG level messages for 
the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.


So now I'm wondering:

1) Anybody know how to see DEBUG output for flink-fs-hadoop-shaded?

2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
override the config?


Thanks in advance,


Jeff Henrikson



https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded

https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml

  
fs.s3a.connection.maximum
15
Controls the maximum number of simultaneous 
connections to S3.

  




On 5/1/20 7:30 PM, Jeff Henrikson wrote:

Hello Flink users,

I could use help with three related questions:

1) How can I observe retries in the flink-s3-fs-hadoop connector?

2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
the hadoop configuration I have provided, as opposed to some separate
default configuration?  My job fails quickly when I read larger or more 
numerous objects from S3.  I conjecture the failure may be related to 
insufficient retries when S3 throttles.


3) What s3 fault recovery approach would you recommend?

Background:

I am having trouble with reliable operation of the flink-s3-fs-hadoop 
connector.   My application sources all its DataStream data from S3, and 
appears to get frequently throttled by s3:


     Caused by:
     org.apache.flink.streaming.runtime.tasks.AsynchronousException:
     Caught exception when processing split: [0]
     s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
     1586911084000 : 0 + 33554432
     . . .
     Caused by: java.io.InterruptedIOException: Failed to open
     s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
     s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
     com.amazonaws.SdkClientException: Unable to execute HTTP request:
     Timeout waiting for connection from pool

The s3 throttling does not seem to trigger retries and so
causes the job to fail.  For troubleshooting purposes, the job stays up
for much longer if I reduce s3 inputs to my job by disabling functionality.

I see in the documentation for hadoop-aws that there are properties
such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling retries
within hadoop.

After wrangling with some classpath troubles, I managed to get
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of hadoop
configuration files {core/hdfs/mapred/yarn}-site.xml.  I can confirm
that the cluster parses the configuration by passing invalid xml and
seeing the cluster crash.

The puzzle with which I am now faced is that the configuration for 
retries and timeouts in core-site.xml seems to have no effect on the

application.

I deploy in kubernetes with a custom docker image.  For now, I have
not enabled the zookeeper-based HA.

See below for a frequent stacktrace that I interpret as likely to be
caused by s3 throttling.

Thanks in advance for any help.

Regards,


Jeff Henrikson



     2020-04-30 19:35:24
     org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
     at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) 

     at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) 

     at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) 

     at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) 

     at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) 

     at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) 

     at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) 

     at 
jdk.internal.reflect.GeneratedMethodAccessor66.invoke(Unknown Source)
     at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 


     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
     at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcI

  1   2   >