Re: Runtime Dependency Issues Upgrading to Flink 1.11.2 from 1.9.2

2020-10-15 Thread Leonard Xu
Hi, Chesnay

> @Leonared I noticed you handled a similar case on the Chinese ML in July 
> , do you have 
> any insights?

The case in Chinese ML is the user added jakarta.ws.rs-api-3.0.0-M1.jar to 
Flink/lib which lead the dependency conflicts, Hailu’s case looks differently.

Hi @Hailu, 
The Hadoop dependency jersey-core-1.9.jar contains class 
javax.ws.rs.RuntimeDelegate,
the dependency javax.ws.rs:javax.ws.rs-api in your shaed jar also contains 
class javax.ws.rs.RuntimeDelegate,
I doubt the CastException come from here.

Best
Leonard




> On 10/15/2020 7:51 PM, Hailu, Andreas wrote:
>> Hi Chesnay, no, we haven’t changed our Hadoop version. The only changes were 
>> the update the 1.11.2 runtime dependencies listed earlier, as well as 
>> compiling with the flink-clients in some of our modules since we were 
>> relying  on the transitive dependency. Our 1.9.2 jobs are still able to run 
>> just fine, which is interesting.
>>  
>> // ah
>>  
>>  <>From: Chesnay Schepler   
>> Sent: Thursday, October 15, 2020 7:34 AM
>> To: Hailu, Andreas [Engineering]  
>> ; user@flink.apache.org 
>> 
>> Subject: Re: Runtime Dependency Issues Upgrading to Flink 1.11.2 from 1.9.2
>>  
>> I'm not aware of any Flink module bundling this class. Note that this class 
>> is also bundled in jersey-core (which is also on your classpath), so it 
>> appears that there is a conflict between this jar and your shaded one.
>> Have you changed the Hadoop version you are using or how you provide them to 
>> Flink?
>>  
>> On 10/14/2020 6:56 PM, Hailu, Andreas wrote:
>> Hi team! We’re trying to upgrade our applications from 1.9.2 to 1.11.2. 
>> After re-compiling and updating our runtime dependencies to use 1.11.2, we 
>> see this LinkageError:
>>  
>> Caused by: java.lang.LinkageError: ClassCastException: attempting to 
>> castjar:file:/local/data/scratch/hailua_p2epdlsuat/flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar!/javax/ws/rs/ext/RuntimeDelegate.class
>>  
>> 
>>  to 
>> jar:file:/local/data/scratch/hailua_p2epdlsuat/flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar!/javax/ws/rs/ext/RuntimeDelegate.class
>>  
>> 
>> at 
>> javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:146) 
>> ~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
>> at 
>> javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:120) 
>> ~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
>> at javax.ws.rs.core.MediaType.valueOf(MediaType.java:179) 
>> ~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
>> at 
>> com.sun.jersey.core.header.MediaTypes.(MediaTypes.java:64) 
>> ~[jersey-core-1.9.jar:1.9]
>> at 
>> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
>>  ~[jersey-core-1.9.jar:1.9]
>> at 
>> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
>>  ~[jersey-core-1.9.jar:1.9]
>> at 
>> com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
>>  ~[jersey-core-1.9.jar:1.9]
>> at com.sun.jersey.api.client.Client.init(Client.java:342) 
>> ~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
>> at com.sun.jersey.api.client.Client.access$000(Client.java:118) 
>> ~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
>> at com.sun.jersey.api.client.Client$1.f(Client.java:191) 
>> ~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
>> at com.sun.jersey.api.client.Client$1.f(Client.java:187) 
>> ~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
>> at 
>> com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193) 
>> ~[jersey-core-1.9.jar:1.9]
>> at com.sun.jersey.api.client.Client.(Client.java:187) 
>> ~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
>> at com.sun.jersey.api.client.Client.(Client.java:170) 
>> ~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
>> at 
>> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
>>  ~[hadoop-yarn-common-2.7.3.2.6.3.0-235.jar:?]
>> at 
>> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) 
>> ~[hadoop-common-2.7.3.2.6.3.0-235.jar:?]
>> at 
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getTimelineDelegationToken(YarnClientImpl.java:355)
>>  ~[hadoop-yarn-client-2.7.3.2.6.3.0-235.jar:?]
>> at 
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.addTimelineDelegationToken(YarnClientImpl.java:331)
>>  ~[hadoop-yarn-client-2.7.3.2.6.3.0-235.jar:?]
>> at 
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:250)
>>  ~[hadoop-yarn-client-2.7.3.2.6.3.0-235.jar:?]
>> at 
>> 

Re: Flink作业运行失败

2020-10-15 Thread Jeff Zhang
你是hadoop2 吗?我记得这个情况只有hadoop3才会出现


gangzi <1139872...@qq.com> 于2020年10月16日周五 上午11:22写道:

> TM
> 的CLASSPATH确实没有hadoop-mapreduce-client-core.jar。这个难道是hadoop集群的问题吗?还是一定要shade-hadoop包,官方不推荐shade-hadoop包了。
>
> > 2020年10月16日 上午10:50,Jeff Zhang  写道:
> >
> > 你看看TM的log,里面有CLASSPATH的
> >
> > gangzi <1139872...@qq.com> 于2020年10月16日周五 上午10:11写道:
> >
> >> 我按照flink官方文档的做法,在hadoop集群每个节点上都:export HADOOP_CLASSPATH =`hadoop
> >> classpath`,但是报:java.lang.NoClassDefFoundError:
> >> org/apache/hadoop/mapred/JobConf
> >>
> >>
> 不知道这个是不是flink的bug,按照这个报错,是缺少:hadoop-mapreduce-client-core.jar这个jar包,但是这个包是在/usr/local/hadoop-2.10.0/share/hadoop/mapreduce/*:这个目录下的,这个目录是包含在HADOOP_CLASSPATH上的,按理说是会加载到的。
> >>
> >>> 2020年10月16日 上午9:59,Shubin Ruan  写道:
> >>>
> >>> export HADOOP_CLASSPATH=
> >>
> >>
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
>
>

-- 
Best Regards

Jeff Zhang


回复: flink 自定义udf注册后不能使用

2020-10-15 Thread 史 正超
Hi, 从日志上看 是说 匹配不到 imei_encrypt的UDF,有可能是sql里传的字段和imei_encrypt的参数不匹配,
能看下你的具体代码和udf的声明吗

发件人: 奔跑的小飞袁 
发送时间: 2020年10月16日 3:30
收件人: user-zh@flink.apache.org 
主题: flink 自定义udf注册后不能使用

hello
我在使用flinkSQL注册udf时,发生了以下错误,这是我定义有问题还是flink的bug
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: SQL validation failed. From line 11, column 6 to line 11,
column 23: No match found for function signature imei_encrypt()
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: SQL validation
failed. From line 11, column 6 to line 11, column 23: No match found for
function signature imei_encrypt()
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:106)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:81)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:61)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 11,
column 6 to line 11, column 23: No match found for function signature
imei_encrypt()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
  

flink 自定义udf注册后不能使用

2020-10-15 Thread 奔跑的小飞袁
hello 
我在使用flinkSQL注册udf时,发生了以下错误,这是我定义有问题还是flink的bug
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: SQL validation failed. From line 11, column 6 to line 11,
column 23: No match found for function signature imei_encrypt()
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: SQL validation
failed. From line 11, column 6 to line 11, column 23: No match found for
function signature imei_encrypt()
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:106)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:81)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:61)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 11,
column 6 to line 11, column 23: No match found for function signature
imei_encrypt()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at 
org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at

Re: Flink作业运行失败

2020-10-15 Thread gangzi
TM 
的CLASSPATH确实没有hadoop-mapreduce-client-core.jar。这个难道是hadoop集群的问题吗?还是一定要shade-hadoop包,官方不推荐shade-hadoop包了。

> 2020年10月16日 上午10:50,Jeff Zhang  写道:
> 
> 你看看TM的log,里面有CLASSPATH的
> 
> gangzi <1139872...@qq.com> 于2020年10月16日周五 上午10:11写道:
> 
>> 我按照flink官方文档的做法,在hadoop集群每个节点上都:export HADOOP_CLASSPATH =`hadoop
>> classpath`,但是报:java.lang.NoClassDefFoundError:
>> org/apache/hadoop/mapred/JobConf
>> 
>> 不知道这个是不是flink的bug,按照这个报错,是缺少:hadoop-mapreduce-client-core.jar这个jar包,但是这个包是在/usr/local/hadoop-2.10.0/share/hadoop/mapreduce/*:这个目录下的,这个目录是包含在HADOOP_CLASSPATH上的,按理说是会加载到的。
>> 
>>> 2020年10月16日 上午9:59,Shubin Ruan  写道:
>>> 
>>> export HADOOP_CLASSPATH=
>> 
>> 
> 
> -- 
> Best Regards
> 
> Jeff Zhang



Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-15 Thread Yang Wang
I am afraid the InetAddress cache could not take effect. Because Kubernetes
only
creates A and SRV records for Services. It doesn't generate pods' A records
as you may expect. Refer here[1][2] for more information. So the DNS reverse
lookup will always fail. IIRC, the default timeout is 5s. This could
explain the delay
of "getHostName" or "getFqdnHostName".

I agree that we should add a config option to disable the DNS reverse
lookup.


[1].
https://kubernetes.io/docs/tasks/administer-cluster/dns-custom-nameservers/#coredns-configmap-options
[2].
https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/#a--records-1


Best,
Yang

Chesnay Schepler  于2020年10月15日周四 下午8:41写道:

> The InetAddress caches the result of getCanonicalHostName(), so it is not
> a problem to call it twice.
>
> On 10/15/2020 1:57 PM, Till Rohrmann wrote:
>
> Hi Weike,
>
> thanks for getting back to us with your findings. Looking at the
> `TaskManagerLocation`, we are actually calling
> `InetAddress.getCanonicalHostName` twice for every creation of a
> `TaskManagerLocation` instance. This does not look right.
>
> I think it should be fine to make the look up configurable. Moreover, one
> could think about only doing a lazy look up if the canonical hostname is
> really needed (as far as I can see it is only really needed input split
> assignments and for the LocationPreferenceSlotSelectionStrategy to
> calculate how many TMs run on the same machine).
>
> Do you want to fix this issue?
>
> Cheers,
> Till
>
> On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike 
> wrote:
>
>> Hi Till and community,
>>
>> By the way, initially I resolved the IPs several times but results
>> returned rather quickly (less than 1ms, possibly due to DNS cache on the
>> server), so I thought it might not be the DNS issue.
>>
>> However, after debugging and logging, it is found that the lookup time
>> exhibited high variance, i. e. normally it completes fast but occasionally
>> some slow results would block the thread. So an unstable DNS server might
>> have a great impact on the performance of Flink job startup.
>>
>> Best,
>> Weike
>>
>> On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike 
>> wrote:
>>
>>> Hi Till and community,
>>>
>>> Increasing `kubernetes.jobmanager.cpu` in the configuration makes this
>>> issue alleviated but not disappeared.
>>>
>>> After adding DEBUG logs to the internals of *flink-runtime*, we have
>>> found the culprit is
>>>
>>> inetAddress.getCanonicalHostName()
>>>
>>> in
>>> *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName*
>>> and
>>> *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName*,
>>> which could take ~ 6 seconds to complete, thus Akka dispatcher(s)
>>> are severely blocked by that.
>>>
>>> By commenting out the two methods, this issue seems to be solved
>>> immediately, so I wonder if Flink could provide a configuration parameter
>>> to turn off the DNS reverse lookup process, as it seems that Flink jobs
>>> could run happily without it.
>>>
>>> Sincerely,
>>> Weike
>>>
>>>
>>> On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann 
>>> wrote:
>>>
 Hi Weike,

 could you try setting kubernetes.jobmanager.cpu: 4 in your
 flink-conf.yaml? I fear that a single CPU is too low for the JobManager
 component.

 Cheers,
 Till

 On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann 
 wrote:

> Hi Weike,
>
> thanks for posting the logs. I will take a look at them. My suspicion
> would be that there is some operation blocking the JobMaster's main thread
> which causes the registrations from the TMs to time out. Maybe the logs
> allow me to validate/falsify this suspicion.
>
> Cheers,
> Till
>
> On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike 
> wrote:
>
>> Hi community,
>>
>> I have uploaded the log files of JobManager and TaskManager-1-1 (one
>> of the 50 TaskManagers) with DEBUG log level and default Flink
>> configuration, and it clearly shows that TaskManager failed to register
>> with JobManager after 10 attempts.
>>
>> Here is the link:
>>
>> JobManager:
>> https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce
>>
>> TaskManager-1-1:
>> https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe
>>
>> Thanks : )
>>
>> Best regards,
>> Weike
>>
>>
>> On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike 
>> wrote:
>>
>>> Hi community,
>>>
>>> Recently we have noticed a strange behavior for Flink jobs on
>>> Kubernetes per-job mode: when the parallelism increases, the time it 
>>> takes
>>> for the TaskManagers to register with *JobManager *becomes
>>> abnormally long (for a task with parallelism of 50, it could take 60 ~ 
>>> 120
>>> seconds or even longer for the registration attempt), and usually more 
>>> than
>>> 10 attempts are needed to finish this 

Re: Flink作业运行失败

2020-10-15 Thread Jeff Zhang
你看看TM的log,里面有CLASSPATH的

gangzi <1139872...@qq.com> 于2020年10月16日周五 上午10:11写道:

> 我按照flink官方文档的做法,在hadoop集群每个节点上都:export HADOOP_CLASSPATH =`hadoop
> classpath`,但是报:java.lang.NoClassDefFoundError:
> org/apache/hadoop/mapred/JobConf
>
> 不知道这个是不是flink的bug,按照这个报错,是缺少:hadoop-mapreduce-client-core.jar这个jar包,但是这个包是在/usr/local/hadoop-2.10.0/share/hadoop/mapreduce/*:这个目录下的,这个目录是包含在HADOOP_CLASSPATH上的,按理说是会加载到的。
>
> > 2020年10月16日 上午9:59,Shubin Ruan  写道:
> >
> > export HADOOP_CLASSPATH=
>
>

-- 
Best Regards

Jeff Zhang


Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread whh_960101
我摘取了plan其中一部分
在过滤数据这里
== Abstract Syntax Tree ==

+- LogicalFilter(condition=[error_exist($1)])

 




== Optimized Logical Plan ==

  +- PythonCalc(select=[message, kubernetes, clusterName, 
error_exist(message) AS f0]) 
#感觉应该是这个地方出问题了,这里应该不是select,应该是where或者filter,上面已经有了LogicalFilter(condition=[error_exist($1)])




== Physical Execution Plan ==




  Stage 3 : Operator

   content : StreamExecPythonCalc

   ship_strategy : FORWARD

















在 2020-10-15 20:59:12,"Dian Fu"  写道:
>可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables
>
>> 在 2020年10月15日,下午7:02,whh_960101  写道:
>> 
>> hi,
>> 我刚才改了一下你的例子[1],通过from_elements构建一个source表
>> 然后使用我的udf
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 打印出来的结果能够很好的筛选出我想要的数据
>> 
>> 
>> 
>> 
>> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
>> source  = st_env.from_path('source')
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 
>> 这个where筛选就失效了,最后打印出全部数据
>> 
>> 
>> 而只在where中使用udf,即
>> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 打印结果就是经过筛选后的
>> 
>> 
>> 
>> 
>> 想请问一下这种问题出在哪里?
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-10-15 16:57:39,"Xingbo Huang"  写道:
>>> Hi,
>>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>>> 
>>> 
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>>> 
>>> Best,
>>> Xingbo
>>> 
>>> whh_960101  于2020年10月15日周四 下午2:30写道:
>>> 
 您好,我使用pyflink时的代码如下,有如下问题:
 
 
 source  = st_env.from_path('source')
 #st_env是StreamTableEnvironment,source是kafka源端
 #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
 DataTypes.BOOLEAN())
 table =
 source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
 
 
 这样打印出来的结果很好的筛选了数据
 
 
 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
 DataTypes.STRING())(将msg简单处理为新的String)
 table =
 source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
 这个where筛选就失效了,最后打印出全部数据
 
 
 如果改成where在前也不行,换成filter也不行
 table =
 source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
 
 
 select、where中的udf会冲突吗?这个问题该怎么解决?
 希望您们能够给予解答!感谢!
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
>> 
>> 
>> 
>> 
>> 


Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread whh_960101
== Abstract Syntax Tree ==

LogicalProject(_c0=[log_get($1)], _c1=[_UTF-16LE''], _c2=[_UTF-16LE''], 
_c3=[_UTF-16LE'ERROR'], _c4=[_UTF-16LE'Asia/Shanghai'], 
_c5=[_UTF-16LE'@timestamp'], kubernetes$container$name=[$3.container.name], 
clusterName=[$2])

+- LogicalFilter(condition=[error_exist($1)])

   +- LogicalTableScan(table=[[default_catalog, default_database, source, 
source: [KafkaTableSource(@timestamp, message, clusterName, kubernetes)]]])




== Optimized Logical Plan ==

Calc(select=[f00 AS _c0, _UTF-16LE'' AS _c1, _UTF-16LE'' AS _c2, 
_UTF-16LE'ERROR' AS _c3, _UTF-16LE'Asia/Shanghai' AS _c4, _UTF-16LE'@timestamp' 
AS _c5, f0 AS kubernetes$container$name, clusterName])

+- PythonCalc(select=[f0, clusterName, log_get(message) AS f00])

   +- Calc(select=[message, clusterName, kubernetes.container.name AS f0])

  +- PythonCalc(select=[message, kubernetes, clusterName, 
error_exist(message) AS f0])

 +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
source, source: [KafkaTableSource(@timestamp, message, clusterName, 
kubernetes)]]], fields=[@timestamp, message, clusterName, kubernetes])




== Physical Execution Plan ==

Stage 1 : Data Source

 content : Source: KafkaTableSource(@timestamp, message, clusterName, 
kubernetes)




 Stage 2 : Operator

  content : SourceConversion(table=[default_catalog.default_database.source, 
source: [KafkaTableSource(@timestamp, message, clusterName, kubernetes)]], 
fields=[@timestamp, message, clusterName, kubernetes])

  ship_strategy : FORWARD




  Stage 3 : Operator

   content : StreamExecPythonCalc

   ship_strategy : FORWARD




   Stage 4 : Operator

content : Calc(select=[message, clusterName, kubernetes.container.name AS 
f0])

ship_strategy : FORWARD




Stage 5 : Operator

 content : StreamExecPythonCalc

 ship_strategy : FORWARD




 Stage 6 : Operator

  content : Calc(select=[f00 AS _c0, _UTF-16LE'' AS _c1, _UTF-16LE'' AS 
_c2, _UTF-16LE'ERROR' AS _c3, _UTF-16LE'Asia/Shanghai' AS _c4, 
_UTF-16LE'@timestamp' AS _c5, f0 AS kubernetes$container$name, clusterName])

  ship_strategy : FORWARD

















在 2020-10-15 20:59:12,"Dian Fu"  写道:
>可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables
>
>> 在 2020年10月15日,下午7:02,whh_960101  写道:
>> 
>> hi,
>> 我刚才改了一下你的例子[1],通过from_elements构建一个source表
>> 然后使用我的udf
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 打印出来的结果能够很好的筛选出我想要的数据
>> 
>> 
>> 
>> 
>> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
>> source  = st_env.from_path('source')
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 
>> 这个where筛选就失效了,最后打印出全部数据
>> 
>> 
>> 而只在where中使用udf,即
>> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 打印结果就是经过筛选后的
>> 
>> 
>> 
>> 
>> 想请问一下这种问题出在哪里?
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-10-15 16:57:39,"Xingbo Huang"  写道:
>>> Hi,
>>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>>> 
>>> 
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>>> 
>>> Best,
>>> Xingbo
>>> 
>>> whh_960101  于2020年10月15日周四 下午2:30写道:
>>> 
 您好,我使用pyflink时的代码如下,有如下问题:
 
 
 source  = st_env.from_path('source')
 #st_env是StreamTableEnvironment,source是kafka源端
 #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
 DataTypes.BOOLEAN())
 table =
 source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
 
 
 这样打印出来的结果很好的筛选了数据
 
 
 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
 DataTypes.STRING())(将msg简单处理为新的String)
 table =
 source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
 这个where筛选就失效了,最后打印出全部数据
 
 
 如果改成where在前也不行,换成filter也不行
 table =
 source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
 
 
 select、where中的udf会冲突吗?这个问题该怎么解决?
 希望您们能够给予解答!感谢!
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
>> 
>> 
>> 
>> 
>> 


Re: Flink作业运行失败

2020-10-15 Thread gangzi
我按照flink官方文档的做法,在hadoop集群每个节点上都:export HADOOP_CLASSPATH =`hadoop 
classpath`,但是报:java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf
不知道这个是不是flink的bug,按照这个报错,是缺少:hadoop-mapreduce-client-core.jar这个jar包,但是这个包是在/usr/local/hadoop-2.10.0/share/hadoop/mapreduce/*:这个目录下的,这个目录是包含在HADOOP_CLASSPATH上的,按理说是会加载到的。

> 2020年10月16日 上午9:59,Shubin Ruan  写道:
> 
> export HADOOP_CLASSPATH=



Re:Flink作业运行失败

2020-10-15 Thread Shubin Ruan
尝试在集群的各个节点上执行下述命令:


export HADOOP_CLASSPATH=


然后执行任务提交。

在 2020-10-15 22:05:43,"gangzi" <1139872...@qq.com> 写道:
>请教一下,flink-1.11.1 yarn per job提交作业后,抛出了如下异常:
>java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf
>at java.lang.Class.getDeclaredMethods0(Native Method)
>at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>at java.lang.Class.getDeclaredMethod(Class.java:2128)
>at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
>at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
>at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>at java.security.AccessController.doPrivileged(Native Method)
>at java.io.ObjectStreamClass.(ObjectStreamClass.java:494)
>at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
>at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
>at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>   

Table SQL and updating rows

2020-10-15 Thread Dan Hill
*Context*
I'm working on a user event logging system using Flink.   I'm tracking some
info that belongs to the user's current session (e.g. location, device,
experiment info).  I don't have a v1 requirement to support mutability but
I want to plan for it.  I think the most likely reason for mutation will be
that different parts of the system update session info (e.g. location might
not be accessible until later if the user does not give permission for it
initially).

*Ideas*
1) Introduce an aggregation step.  E.g. key by sessionId and run a
TableAggregateFunction.
2) Split the session into smaller parts that can only be inserted once.  Do
a join to create a combined session.

How have other people solved this with Flink Table SQL?  If I was using
DataStream directly, I'd group by and have a map function that uses key
state to keep track of the latest values.


??????????????????????????????????????

2020-10-15 Thread ??????
??aggregateFunction??


| |
??
|
|
??xiongyun...@163.com
|

??  

??2020??10??15?? 15:47?? ??
Hi,All


??flink??1??12:00~12:59??flink
??
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  .timeWindow(Time.hours(1))
 


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-15 Thread Kostas Kloudas
@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1] 
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:
>
> I remember this conversation popping up a few times already and I'm in
> general a big fan of removing BucketingSink.
>
> However, until now there were a few features lacking in StreamingFileSink
> that are present in BucketingSink and that are being actively used (I can't
> exactly remember them now, but I can look it up if everyone else is also
> suffering from bad memory). Did we manage to add them in the meantime? If
> not, then it feels rushed to remove it at this point.
>
> On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas  wrote:
>
> > @Chesnay Schepler  Off the top of my head, I cannot find an easy way
> > to migrate from the BucketingSink to the StreamingFileSink. It may be
> > possible but it will require some effort because the logic would be
> > "read the old state, commit it, and start fresh with the
> > StreamingFileSink."
> >
> > On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
> > wrote:
> > >
> > > On 13.10.20 14:01, David Anderson wrote:
> > > > I thought this was waiting on FLIP-46 -- Graceful Shutdown Handling --
> > and
> > > > in fact, the StreamingFileSink is mentioned in that FLIP as a
> > motivating
> > > > use case.
> > >
> > > Ah yes, I see FLIP-147 as a more general replacement for FLIP-46. Thanks
> > > for the reminder, we should close FLIP-46 now with an explanatory
> > > message to avoid confusion.
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng


Re: Runtime Dependency Issues Upgrading to Flink 1.11.2 from 1.9.2

2020-10-15 Thread Chesnay Schepler
Can you provide us with the classpath (found at the top of the log file) 
for your 1.9.2/1.11.2 setups?
(to see whether maybe something has changed in regards to classpath 
ordering)


It would also be good to know what things were copied  from opt/ to lib/ 
or plugins/.

(to see whether some packaging change might affect you)

Is the Stacktrace from your attempt where you set rs-api to compileOnly, 
or from a previous attempt? (I assume you use gradle?)

Was the error the same in both cases?

@Leonared I noticed you handled a similar case on the Chinese ML in July 
, do you 
have any insights?


On 10/15/2020 7:51 PM, Hailu, Andreas wrote:


Hi Chesnay, no, we haven’t changed our Hadoop version. The only 
changes were the update the 1.11.2 runtime dependencies listed 
earlier, as well as compiling with the flink-clients in some of our 
modules since we were relying  on the transitive dependency. Our 1.9.2 
jobs are still able to run just fine, which is interesting.


*// *ah**

*From:*Chesnay Schepler 
*Sent:* Thursday, October 15, 2020 7:34 AM
*To:* Hailu, Andreas [Engineering] ; 
user@flink.apache.org
*Subject:* Re: Runtime Dependency Issues Upgrading to Flink 1.11.2 
from 1.9.2


I'm not aware of any Flink module bundling this class. Note that this 
class is also bundled in jersey-core (which is also on your 
classpath), so it appears that there is a conflict between this jar 
and your shaded one.


Have you changed the Hadoop version you are using or how you provide 
them to Flink?


On 10/14/2020 6:56 PM, Hailu, Andreas wrote:

Hi team! We’re trying to upgrade our applications from 1.9.2 to
1.11.2. After re-compiling and updating our runtime dependencies
to use 1.11.2, we see this LinkageError:

Caused by: java.lang.LinkageError: ClassCastException: attempting
to

castjar:file:/local/data/scratch/hailua_p2epdlsuat/flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar!/javax/ws/rs/ext/RuntimeDelegate.class


to

jar:file:/local/data/scratch/hailua_p2epdlsuat/flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar!/javax/ws/rs/ext/RuntimeDelegate.class

    at
javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:146)
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]

    at
javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:120)
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]

    at javax.ws.rs.core.MediaType.valueOf(MediaType.java:179)
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]

    at
com.sun.jersey.core.header.MediaTypes.(MediaTypes.java:64)
~[jersey-core-1.9.jar:1.9]

    at

com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
~[jersey-core-1.9.jar:1.9]

    at

com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
~[jersey-core-1.9.jar:1.9]

    at

com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
~[jersey-core-1.9.jar:1.9]

    at com.sun.jersey.api.client.Client.init(Client.java:342)
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]

    at
com.sun.jersey.api.client.Client.access$000(Client.java:118)
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]

    at com.sun.jersey.api.client.Client$1.f(Client.java:191)
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]

    at com.sun.jersey.api.client.Client$1.f(Client.java:187)
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]

    at
com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
~[jersey-core-1.9.jar:1.9]

    at
com.sun.jersey.api.client.Client.(Client.java:187)
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]

    at
com.sun.jersey.api.client.Client.(Client.java:170)
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]

    at

org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
~[hadoop-yarn-common-2.7.3.2.6.3.0-235.jar:?]

    at
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
~[hadoop-common-2.7.3.2.6.3.0-235.jar:?]

    at

org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getTimelineDelegationToken(YarnClientImpl.java:355)
~[hadoop-yarn-client-2.7.3.2.6.3.0-235.jar:?]

    at

org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.addTimelineDelegationToken(YarnClientImpl.java:331)
~[hadoop-yarn-client-2.7.3.2.6.3.0-235.jar:?]

    at

org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:250)
~[hadoop-yarn-client-2.7.3.2.6.3.0-235.jar:?]

    at


Re: Broadcasting control messages to a sink

2020-10-15 Thread Jaffe, Julian
Hey AJ,

I’m not familiar with the stock parquet sink, but if it requires a schema on 
creation you won’t be able to change the output schema without restarting the 
job. I’m using a custom sink that can update the schema it uses. The problem 
I’m facing is how to communicate those updates in an efficient way. Currently 
I’m checking every record for differences with the stored schema which is both 
a lot of overhead and creates discrepancies in when partitions update the 
schema they use to write and when a schema can be marked as “in use”.

For your use case, the analogous approach would be replace the Parquet sink 
with a custom sink that managed the lifecycle of the underlying parquet writer 
itself. Then you could control closing the current writer and creating a new 
one with an updated schema yourself and thus do it in code instead of via a 
restart.

Julian

From: aj 
Date: Thursday, October 15, 2020 at 4:12 AM
To: "Jaffe, Julian" 
Cc: Piotr Nowojski , user 
Subject: Re: Broadcasting control messages to a sink

Hi Jaffe,

I am also working on something similar type of a problem.

I am receiving a set of events in Avro format on different topics. I want to 
consume these and write to s3 in parquet format.
I have written a  job that creates a different stream for each event and 
fetches its schema from the confluent schema registry to create a parquet sink 
for an event.
This is working fine but the only problem I am facing is whenever a new event 
start coming or any change in the schema  I have to change in the YAML config 
and restart the job every time. Is there any way I do not have to restart the 
job and it starts consuming a new set of events?

As I see you are handling schema evolution can u help me with this and also how 
can I handle the new events.  In the config, I am keeping a mapping of events 
and schema subjects.  Please share how you solving this.


So currently this is the way I am doing it but wanna know some better way to 
handle it.

FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream dataStream = 
streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink sink = StreamingFileSink.forBulkFormat
(path, 
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
 registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream outStream = 
dataStream.filter((FilterFunction) genericRecord -> {
if (genericRecord != null && 
genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});

outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}

On Wed, Oct 14, 2020, 23:12 Jaffe, Julian 
mailto:julianja...@activision.com>> wrote:
Thanks for the suggestion Piotr!

The problem is that the sink needs to have access to the schema (so that it can 
write the schema only once per file instead of record) and thus needs to know 
when the schema has been updated. In this proposed architecture, I think the 
sink would still need to check each record to see if the current schema matches 
the new record or not? The main problem I encountered when playing around with 
broadcast state was that I couldn’t figure out how to access the broadcast 
state within the sink, but perhaps I just haven’t thought about it the right 
way. I’ll meditate on the docs further  

Julian

From: Piotr Nowojski mailto:pnowoj...@apache.org>>
Date: Wednesday, October 14, 2020 at 6:35 AM
To: "Jaffe, Julian" 
mailto:julianja...@activision.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Broadcasting control messages to a sink

Hi Julian,

Have you seen Broadcast State [1]? I have never used it personally, but it 
sounds like something you want. Maybe your job should look like:

1. read raw messages from Kafka, without using the schema
2. read schema changes and broadcast them to 3. and 5.
3. deserialize kafka records in BroadcastProcessFunction by using combined 1. 
and 2.
4. do your logic o
5. serialize records using schema in another BroadcastProcessFunction by using 
combined 4. and 2.
6. write raw records using BucketingSink
?

Best,
Piotrek

[1] 

RE: Runtime Dependency Issues Upgrading to Flink 1.11.2 from 1.9.2

2020-10-15 Thread Hailu, Andreas
Hi Chesnay, no, we haven't changed our Hadoop version. The only changes were 
the update the 1.11.2 runtime dependencies listed earlier, as well as compiling 
with the flink-clients in some of our modules since we were relying  on the 
transitive dependency. Our 1.9.2 jobs are still able to run just fine, which is 
interesting.

// ah

From: Chesnay Schepler 
Sent: Thursday, October 15, 2020 7:34 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: Runtime Dependency Issues Upgrading to Flink 1.11.2 from 1.9.2

I'm not aware of any Flink module bundling this class. Note that this class is 
also bundled in jersey-core (which is also on your classpath), so it appears 
that there is a conflict between this jar and your shaded one.
Have you changed the Hadoop version you are using or how you provide them to 
Flink?

On 10/14/2020 6:56 PM, Hailu, Andreas wrote:
Hi team! We're trying to upgrade our applications from 1.9.2 to 1.11.2. After 
re-compiling and updating our runtime dependencies to use 1.11.2, we see this 
LinkageError:

Caused by: java.lang.LinkageError: ClassCastException: attempting to 
castjar:file:/local/data/scratch/hailua_p2epdlsuat/flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar!/javax/ws/rs/ext/RuntimeDelegate.class
 to 
jar:file:/local/data/scratch/hailua_p2epdlsuat/flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar!/javax/ws/rs/ext/RuntimeDelegate.class
at 
javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:146) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
at 
javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:120) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
at javax.ws.rs.core.MediaType.valueOf(MediaType.java:179) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
at com.sun.jersey.core.header.MediaTypes.(MediaTypes.java:64) 
~[jersey-core-1.9.jar:1.9]
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
 ~[jersey-core-1.9.jar:1.9]
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
 ~[jersey-core-1.9.jar:1.9]
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
 ~[jersey-core-1.9.jar:1.9]
at com.sun.jersey.api.client.Client.init(Client.java:342) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
at com.sun.jersey.api.client.Client.access$000(Client.java:118) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
at com.sun.jersey.api.client.Client$1.f(Client.java:191) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
at com.sun.jersey.api.client.Client$1.f(Client.java:187) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193) 
~[jersey-core-1.9.jar:1.9]
at com.sun.jersey.api.client.Client.(Client.java:187) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
at com.sun.jersey.api.client.Client.(Client.java:170) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]
at 
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
 ~[hadoop-yarn-common-2.7.3.2.6.3.0-235.jar:?]
at 
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) 
~[hadoop-common-2.7.3.2.6.3.0-235.jar:?]
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getTimelineDelegationToken(YarnClientImpl.java:355)
 ~[hadoop-yarn-client-2.7.3.2.6.3.0-235.jar:?]
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.addTimelineDelegationToken(YarnClientImpl.java:331)
 ~[hadoop-yarn-client-2.7.3.2.6.3.0-235.jar:?]
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:250)
 ~[hadoop-yarn-client-2.7.3.2.6.3.0-235.jar:?]
at 
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1002)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:524)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]

I'll note that the flink-ingest-refiner jar 

Re: what's the datasets used in flink sql document?

2020-10-15 Thread David Anderson
For a dockerized playground that includes a dataset, many working examples,
and training slides, see [1].

[1] https://github.com/ververica/sql-training

David

On Thu, Oct 15, 2020 at 10:18 AM Piotr Nowojski 
wrote:

> Hi,
>
> The examples in the documentation are not fully functional. They assume
> (like in this case), that you would have an already predefined table
> orders, with the required fields. As I mentioned before, there are working
> examples available and you can also read the documentation on how to
> register tables on your own (both linked in my previous messages).
>
> Best
> Piotrek
>
> śr., 14 paź 2020 o 18:12 大森林  napisał(a):
>
>> much Manks for your replies
>>
>> I mean,where the "france revenue"
>> in the following document ?
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html
>>
>> Thanks for your help~
>>
>> -- 原始邮件 --
>> *发件人:* "Piotr Nowojski" ;
>> *发送时间:* 2020年10月14日(星期三) 晚上10:20
>> *收件人:* "大森林";
>> *抄送:* "user";
>> *主题:* Re: what's the datasets used in flink sql document?
>>
>> Hi,
>>
>> Can you link what document do you have in mind? The documentation [1]? I
>> don't think so.
>>
>> There are working examples, located in the binary distribution under the
>> `examples/table/` directory. Their code is available in the repository [2].
>>
>> Best regards,
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#query-a-table
>> [2]
>> https://github.com/apache/flink/tree/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics
>>
>> śr., 14 paź 2020 o 15:55 大森林  napisał(a):
>>
>>> sorry that I did not make it clear.
>>>
>>> I mean:
>>> Is there such a dataset can be downloaded
>>> to satisfy all the examples in the document?
>>>
>>> Thanks for your help
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Piotr Nowojski" ;
>>> *发送时间:* 2020年10月14日(星期三) 晚上9:52
>>> *收件人:* "大森林";
>>> *抄送:* "user";
>>> *主题:* Re: what's the datasets used in flink sql document?
>>>
>>> Hi,
>>>
>>> It depends how you defined `orders` in your example. For example here [1]
>>>
>>> > Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)
>>>
>>> `orders` is obtained from the environment, from a table registered under
>>> the name "Orders". You would need to first register such table, or register
>>> a catalog with such table [2]
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#overview--examples
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment
>>>
>>> śr., 14 paź 2020 o 15:15 大森林  napisał(a):
>>>
 Could anyone tell me
 what's the datasets used in flink sql document?

 For sql like:

 val revenue = orders
   .filter($"cCountry" === "FRANCE")
   .groupBy($"cID", $"cName")
   .select($"cID", $"cName", $"revenue".sum AS "revSum")


 Thanks for your help




Flink作业运行失败

2020-10-15 Thread gangzi
请教一下,flink-1.11.1 yarn per job提交作业后,抛出了如下异常:
java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethod(Class.java:2128)
at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:494)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at 

akka.framesize configuration does not runtime execution

2020-10-15 Thread Yuval Itzchakov
Hi,

Due to a very large savepoint metadata file (3GB +), I've set the
akka.framesize that is being required to 5GB. I set this via flink.conf
`akka.framesize` property.

When trying to recover from the savepoint, the JM emits the following error
message:

"thread":"flink-akka.actor.default-dispatcher-30"
"level":"ERROR"
"loggerName":"akka.remote.EndpointWriter"
"message":"Transient "Discarding oversized payload sent to
Actor[akka.tcp://flink@XXX:XXX/user/taskmanager_0#369979612]: max allowed
size 1073741824 bytes, actual size of encoded class
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 1610683118
"name":"akka.remote.OversizedPayloadException"

As I recall, while taking the savepoint the maximum framesize was indeed
defined as 1GB.

Could it be that akka.framesize is being recovered from the stored
savepoint, thus not allowing me to configure re-configure the maximum size
of the payload?

-- 
Best Regards,
Yuval Itzchakov.


Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread Dian Fu
可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables

> 在 2020年10月15日,下午7:02,whh_960101  写道:
> 
> hi,
> 我刚才改了一下你的例子[1],通过from_elements构建一个source表
> 然后使用我的udf
> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
> 打印出来的结果能够很好的筛选出我想要的数据
> 
> 
> 
> 
> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
> source  = st_env.from_path('source')
> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
> 
> 这个where筛选就失效了,最后打印出全部数据
> 
> 
> 而只在where中使用udf,即
> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
> 打印结果就是经过筛选后的
> 
> 
> 
> 
> 想请问一下这种问题出在哪里?
> 
> 
> 
> 
> 
> 
> 在 2020-10-15 16:57:39,"Xingbo Huang"  写道:
>> Hi,
>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>> 
>> 
>> [1]
>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>> 
>> Best,
>> Xingbo
>> 
>> whh_960101  于2020年10月15日周四 下午2:30写道:
>> 
>>> 您好,我使用pyflink时的代码如下,有如下问题:
>>> 
>>> 
>>> source  = st_env.from_path('source')
>>> #st_env是StreamTableEnvironment,source是kafka源端
>>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>>> DataTypes.BOOLEAN())
>>> table =
>>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>> 
>>> 
>>> 这样打印出来的结果很好的筛选了数据
>>> 
>>> 
>>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>>> DataTypes.STRING())(将msg简单处理为新的String)
>>> table =
>>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>> 这个where筛选就失效了,最后打印出全部数据
>>> 
>>> 
>>> 如果改成where在前也不行,换成filter也不行
>>> table =
>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>> 
>>> 
>>> select、where中的udf会冲突吗?这个问题该怎么解决?
>>> 希望您们能够给予解答!感谢!
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
> 
> 
> 
> 
> 



回复: kafka topic字段 不全的统计场景

2020-10-15 Thread 史 正超
@Kyle Zhang 谢谢答复,现在差不多就是你说的这种方式做的。

发送自 Windows 10 版邮件应用


发件人: Kyle Zhang 
发送时间: Thursday, October 15, 2020 6:56:08 PM
收件人: user-zh@flink.apache.org 
主题: Re: kafka topic字段 不全的统计场景

group by id应该就可以了吧,其他几个字段用last value或者first value[1],还有就是考虑迟到的数据怎么处理

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html

On Thu, Oct 15, 2020 at 5:01 PM 史 正超  wrote:

> 大佬们,现在我有个场景:
> 一个kafka 主题 有 4个字段 , id, field2, field3, field4,其中id 是唯一标识,
> 但是有个问题是,并不是每个消息都会带上全量的字段消息,只有id是固有的字段。然后需要把id, field2, field3, field4
> 作为一个维度 统计, 比如有如下 kafka消息:
> {"id": 1, "field2":"b"}
> {"id": 1, "field3":"c", "field4":"d"}
> 那么 按照维度 count(1)  (group by id, field2, field3, field4) 出来想要得到这样的统计:
> (1,b,  c, d)=> 1
>
> 这种需求有没有什么方案啊
>


Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-15 Thread Chesnay Schepler
The InetAddress caches the result of getCanonicalHostName(), so it is 
not a problem to call it twice.


On 10/15/2020 1:57 PM, Till Rohrmann wrote:

Hi Weike,

thanks for getting back to us with your findings. Looking at the 
`TaskManagerLocation`, we are actually calling 
`InetAddress.getCanonicalHostName` twice for every creation of a 
`TaskManagerLocation` instance. This does not look right.


I think it should be fine to make the look up configurable. Moreover, 
one could think about only doing a lazy look up if the canonical 
hostname is really needed (as far as I can see it is only really 
needed input split assignments and for 
the LocationPreferenceSlotSelectionStrategy to calculate how many TMs 
run on the same machine).


Do you want to fix this issue?

Cheers,
Till

On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike > wrote:


Hi Till and community,

By the way, initially I resolved the IPs several times but results
returned rather quickly (less than 1ms, possibly due to DNS cache
on the server), so I thought it might not be the DNS issue.

However, after debugging and logging, it is found that the lookup
time exhibited high variance, i. e. normally it completes fast but
occasionally some slow results would block the thread. So an
unstable DNS server might have a great impact on the performance
of Flink job startup.

Best,
Weike

On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike
mailto:kyled...@connect.hku.hk>> wrote:

Hi Till and community,

Increasing `kubernetes.jobmanager.cpu` in the configuration
makes this issue alleviated but not disappeared.

After adding DEBUG logs to the internals of /flink-runtime/,
we have found the culprit is

inetAddress.getCanonicalHostName()

in
/org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName/
and

/org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName/,
which could take ~ 6 seconds to complete, thus Akka
dispatcher(s) are severely blocked by that.

By commenting out the two methods, this issue seems to be
solved immediately, so I wonder if Flink could provide a
configuration parameter to turn off the DNS reverse lookup
process, as it seems that Flink jobs could run happily without it.

Sincerely,
Weike


On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann
mailto:trohrm...@apache.org>> wrote:

Hi Weike,

could you try setting kubernetes.jobmanager.cpu: 4 in your
flink-conf.yaml? I fear that a single CPU is too low for
the JobManager component.

Cheers,
Till

On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann
mailto:trohrm...@apache.org>> wrote:

Hi Weike,

thanks for posting the logs. I will take a look at
them. My suspicion would be that there is some
operation blocking the JobMaster's main thread which
causes the registrations from the TMs to time out.
Maybe the logs allow me to validate/falsify this
suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike
mailto:kyled...@connect.hku.hk>> wrote:

Hi community,

I have uploaded the log files of JobManager and
TaskManager-1-1 (one of the 50 TaskManagers) with
DEBUG log level and default Flink configuration,
and it clearly shows that TaskManager failed to
register with JobManager after 10 attempts.

Here is the link:

JobManager:

https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce

TaskManager-1-1:

https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe


Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike
mailto:kyled...@connect.hku.hk>> wrote:

Hi community,

Recently we have noticed a strange behavior
for Flink jobs on Kubernetes per-job mode:
when the parallelism increases, the time it
takes for the TaskManagers to register with
*JobManager *becomes abnormally long (for a
task with parallelism of 50, it could take 60
~ 120 seconds or even longer for the
registration attempt), and usually more than
10 attempts are needed to finish this

Re: Correct way to handle RedisSink exception

2020-10-15 Thread Manas Kale
Hi all,
Thank you for the help, I understand now.

On Thu, Oct 15, 2020 at 5:28 PM 阮树斌 浙江大学  wrote:

> hello, Manas Kale.
>
> From the log, it can be found that the exception was thrown on the
> 'open()' method of the RedisSink class. You can inherit the RedisSink
> class, then override the 'open()' method, and handle the exception as you
> wish.Or no longer use Apache Bahir[1] Flink redis connector class library,
> and inherit RichSinkFunction to develop a custom RedisSink class.
>
> Regards
> Shubin Ruan
>
> At 2020-10-15 19:27:29, "Manas Kale"  wrote:
>
> Hi,
> I have a streaming application that pushes output to a redis cluster sink.
> I am using the Apache Bahir[1] Flink redis connector for this. I want to
> handle the case when the redis server is unavailable.
> I am following the same pattern as outlined by them in [1]:
>
> FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
> .setNodes(new HashSet(Arrays.asList(new 
> InetSocketAddress(5601.build();
>
> DataStream stream = ...;
> stream.addSink(new RedisSink>(conf, new 
> RedisExampleMapper());
>
> However, if the redis server is not available, my whole job crashes with
> this exception:
>
> ERROR org.apache.flink.streaming.connectors.redis.RedisSink -
> Redis has not been properly initialized:
> redis.clients.jedis.exceptions.JedisConnectionException: Could not get a
> resource from the pool
> at redis.clients.util.Pool.getResource(Pool.java:53)
> at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
> ...
>
> I want to handle and ignore such exceptions thrown by the RedisSink class.
> Where exactly do I put my try/catch to do this? Enclosing the last in the
> code snippet with try/catch does not work.
> I believe the only way to do this would be to handle the exception in the
> RedisSink class, but that is a library class provided by Bahir. Is my
> thinking correct?
>
>
> asd
> [1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
>
>
> Regards,
> Manas
>
>
>
>
>
>
>
>


Re:Correct way to handle RedisSink exception

2020-10-15 Thread 阮树斌 浙江大学
hello, Manas Kale.


From the log, it can be found that the exception was thrown on the 'open()' 
method of the RedisSink class. You can inherit the RedisSink class, then 
override the 'open()' method, and handle the exception as you wish.Or no longer 
use Apache Bahir[1] Flink redis connector class library, and inherit 
RichSinkFunction to develop a custom RedisSink class.


Regards
Shubin Ruan

At 2020-10-15 19:27:29, "Manas Kale"  wrote:

Hi,
I have a streaming application that pushes output to a redis cluster sink. I am 
using the Apache Bahir[1] Flink redis connector for this. I want to handle the 
case when the redis server is unavailable. 
I am following the same pattern as outlined by them in [1]:
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setNodes(new HashSet(Arrays.asList(new 
InetSocketAddress(5601.build();

DataStream stream = ...;
stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
However, if the redis server is not available, my whole job crashes with this 
exception:


ERROR org.apache.flink.streaming.connectors.redis.RedisSink - Redis has 
not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a 
resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)

...


I want to handle and ignore such exceptions thrown by the RedisSink class. 
Where exactly do I put my try/catch to do this? Enclosing the last in the code 
snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in the 
RedisSink class, but that is a library class provided by Bahir. Is my thinking 
correct?




asd
[1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/




Regards,
Manas




 

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-15 Thread Till Rohrmann
Hi Weike,

thanks for getting back to us with your findings. Looking at the
`TaskManagerLocation`, we are actually calling
`InetAddress.getCanonicalHostName` twice for every creation of a
`TaskManagerLocation` instance. This does not look right.

I think it should be fine to make the look up configurable. Moreover, one
could think about only doing a lazy look up if the canonical hostname is
really needed (as far as I can see it is only really needed input split
assignments and for the LocationPreferenceSlotSelectionStrategy to
calculate how many TMs run on the same machine).

Do you want to fix this issue?

Cheers,
Till

On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike 
wrote:

> Hi Till and community,
>
> By the way, initially I resolved the IPs several times but results
> returned rather quickly (less than 1ms, possibly due to DNS cache on the
> server), so I thought it might not be the DNS issue.
>
> However, after debugging and logging, it is found that the lookup time
> exhibited high variance, i. e. normally it completes fast but occasionally
> some slow results would block the thread. So an unstable DNS server might
> have a great impact on the performance of Flink job startup.
>
> Best,
> Weike
>
> On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike 
> wrote:
>
>> Hi Till and community,
>>
>> Increasing `kubernetes.jobmanager.cpu` in the configuration makes this
>> issue alleviated but not disappeared.
>>
>> After adding DEBUG logs to the internals of *flink-runtime*, we have
>> found the culprit is
>>
>> inetAddress.getCanonicalHostName()
>>
>> in *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName*
>> and
>> *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName*,
>> which could take ~ 6 seconds to complete, thus Akka dispatcher(s)
>> are severely blocked by that.
>>
>> By commenting out the two methods, this issue seems to be solved
>> immediately, so I wonder if Flink could provide a configuration parameter
>> to turn off the DNS reverse lookup process, as it seems that Flink jobs
>> could run happily without it.
>>
>> Sincerely,
>> Weike
>>
>>
>> On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Weike,
>>>
>>> could you try setting kubernetes.jobmanager.cpu: 4 in your
>>> flink-conf.yaml? I fear that a single CPU is too low for the JobManager
>>> component.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann 
>>> wrote:
>>>
 Hi Weike,

 thanks for posting the logs. I will take a look at them. My suspicion
 would be that there is some operation blocking the JobMaster's main thread
 which causes the registrations from the TMs to time out. Maybe the logs
 allow me to validate/falsify this suspicion.

 Cheers,
 Till

 On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike 
 wrote:

> Hi community,
>
> I have uploaded the log files of JobManager and TaskManager-1-1 (one
> of the 50 TaskManagers) with DEBUG log level and default Flink
> configuration, and it clearly shows that TaskManager failed to register
> with JobManager after 10 attempts.
>
> Here is the link:
>
> JobManager:
> https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce
>
> TaskManager-1-1:
> https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe
>
> Thanks : )
>
> Best regards,
> Weike
>
>
> On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike 
> wrote:
>
>> Hi community,
>>
>> Recently we have noticed a strange behavior for Flink jobs on
>> Kubernetes per-job mode: when the parallelism increases, the time it 
>> takes
>> for the TaskManagers to register with *JobManager *becomes
>> abnormally long (for a task with parallelism of 50, it could take 60 ~ 
>> 120
>> seconds or even longer for the registration attempt), and usually more 
>> than
>> 10 attempts are needed to finish this registration.
>>
>> Because of this, we could not submit a job requiring more than 20
>> slots with the default configuration, as the TaskManager would say:
>>
>>
>>> Registration at JobManager 
>>> (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2)
>>> attempt 9 timed out after 25600 ms
>>
>> Free slot with allocation id 60d5277e138a94fb73fc6691557001e0
>>> because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out.
>>
>> Free slot TaskSlot(index:0, state:ALLOCATED, resource profile:
>>> ResourceProfile{cpuCores=1., taskHeapMemory=1.425gb
>>> (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb
>>> (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)},
>>> allocationId: 60d5277e138a94fb73fc6691557001e0, jobId:
>>> 493cd86e389ccc8f2887e1222903b5ce).
>>> java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has
>>> timed out.
>>
>>

Re: StatsD metric name prefix change for task manager after upgrading to Flink 1.11

2020-10-15 Thread Chesnay Schepler
The TaskExecutor host being exposed is directly wired to what the RPC 
system for addresses, which may have changed due to (FLINK-15911; NAT 
support).


If the problem is purely about the periods in the IP, then I would 
suggest to create a custom reporter that extends the StatsDReporter and 
overrides filterCharacters to also replace periods.
This also reminds me of a suggestion we got in the past where we 
automatically replace occurrences of the delimiter; let me open an issue 
for that...


On 10/14/2020 6:54 PM, Allen Wang wrote:

Hello,

We noticed that after upgrading to Flink 1.11, the StatsD metric 
prefix is changed from the hostname to IP address of the task manager.


The Flink job runs in a k8s cluster.

Here is an example of metric reported to StatsD in Flink 1.10:
flink-ingest-cx-home-page-feed-flink-task-manager-7f8c7677l85pl.taskmanager.16c2dbc84eb27f336455615e642c6cdd.flink-ingest-cx-home-page-feed.Source-
 Custom Source.1.assigned-partitions:3.0|g
Here is an example of metric reported to StatsD in Flink 1.11:
10.4.155.205.taskmanager.0a900ab762d7d534ea8b20e84438166b.flink-ingest-xp-xp.Source-
 Custom Source.0.assigned-partitions:3.0|g
This caused a problemfor us as StatsD interpretsthe segment before the first dot as the source. So after upgrading to 
1.11, the task manager metrics all have "10" as the source.
Is there any configuration to change this behavior back to the 1.10 
version where the prefix of the metric is the host name?


Thanks,
Allen





Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 Thread amen...@163.com
追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗?
那这种设置env的方式有可能还会造成其他什么问题?

best,
amenhub
 
发件人: amen...@163.com
发送时间: 2020-10-15 19:22
收件人: user-zh
主题: Re: Re: flink1.11加载外部jar包进行UDF注册
非常感谢您的回复!
 
对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。
 
期待您的回复,谢谢~
 
best, 
amenhub
发件人: cxydeve...@163.com
发送时间: 2020-10-15 17:46
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
我们用方法是通过反射设置env的配置,增加pipeline.classpaths
具体代码如下
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
String path = "https://...xxx.jar;;
loadJar(new URL(path));
Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration)configuration.get(env);
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map temp = (Map)confData.get(o);
List jarList = new ArrayList<>();
jarList.add(path);
temp.put("pipeline.classpaths",jarList);
tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
'flinksql.function.udf.CxyTestReturnSelf'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
"f_random_str STRING" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select CxyTestReturnSelf(f_random_str) " +
"from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Correct way to handle RedisSink exception

2020-10-15 Thread Chesnay Schepler
You will have to create a custom version of the redis connector that 
ignores such exceptions.


On 10/15/2020 1:27 PM, Manas Kale wrote:

Hi,
I have a streaming application that pushes output to a redis cluster 
sink. I am using the Apache Bahir[1] Flink redis connector for this. I 
want to handle the case when the redis server is unavailable.

I am following the same pattern as outlined by them in [1]:
|FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() 
.setNodes(new HashSet(Arrays.asList(new 
InetSocketAddress(5601.build(); DataStream stream = ...; 
stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());|
However, if the redis server is not available, my whole job crashes 
with this exception:


ERROR org.apache.flink.streaming.connectors.redis.RedisSink         - 
Redis has not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get 
a resource from the pool

at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
...

I want to handle and ignore such exceptions thrown by the RedisSink 
class. Where exactly do I put my try/catch to do this? Enclosing the 
last in the code snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in 
the RedisSink class, but that is a library class provided by Bahir. Is 
my thinking correct?



asd
[1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/


Regards,
Manas





Re: Runtime Dependency Issues Upgrading to Flink 1.11.2 from 1.9.2

2020-10-15 Thread Chesnay Schepler
I'm not aware of any Flink module bundling this class. Note that this 
class is also bundled in jersey-core (which is also on your classpath), 
so it appears that there is a conflict between this jar and your shaded one.
Have you changed the Hadoop version you are using or how you provide 
them to Flink?


On 10/14/2020 6:56 PM, Hailu, Andreas wrote:


Hi team! We’re trying to upgrade our applications from 1.9.2 to 
1.11.2. After re-compiling and updating our runtime dependencies to 
use 1.11.2, we see this LinkageError:


Caused by: java.lang.LinkageError: ClassCastException: attempting to 
castjar:file:/local/data/scratch/hailua_p2epdlsuat/flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar!/javax/ws/rs/ext/RuntimeDelegate.class 
to 
jar:file:/local/data/scratch/hailua_p2epdlsuat/flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar!/javax/ws/rs/ext/RuntimeDelegate.class


    at 
javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:146) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]


    at 
javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:120) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]


    at javax.ws.rs.core.MediaType.valueOf(MediaType.java:179) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]


    at 
com.sun.jersey.core.header.MediaTypes.(MediaTypes.java:64) 
~[jersey-core-1.9.jar:1.9]


    at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182) 
~[jersey-core-1.9.jar:1.9]


    at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175) 
~[jersey-core-1.9.jar:1.9]


    at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162) 
~[jersey-core-1.9.jar:1.9]


    at com.sun.jersey.api.client.Client.init(Client.java:342) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]


    at 
com.sun.jersey.api.client.Client.access$000(Client.java:118) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]


    at com.sun.jersey.api.client.Client$1.f(Client.java:191) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]


    at com.sun.jersey.api.client.Client$1.f(Client.java:187) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]


    at 
com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193) 
~[jersey-core-1.9.jar:1.9]


    at com.sun.jersey.api.client.Client.(Client.java:187) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]


    at com.sun.jersey.api.client.Client.(Client.java:170) 
~[flink-ingest-refiner-sandbox-SNAPSHOT-fat-shaded.jar:?]


    at 
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285) 
~[hadoop-yarn-common-2.7.3.2.6.3.0-235.jar:?]


    at 
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) 
~[hadoop-common-2.7.3.2.6.3.0-235.jar:?]


    at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getTimelineDelegationToken(YarnClientImpl.java:355) 
~[hadoop-yarn-client-2.7.3.2.6.3.0-235.jar:?]


    at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.addTimelineDelegationToken(YarnClientImpl.java:331) 
~[hadoop-yarn-client-2.7.3.2.6.3.0-235.jar:?]


    at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:250) 
~[hadoop-yarn-client-2.7.3.2.6.3.0-235.jar:?]


    at 
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1002) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]


    at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:524) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]


    at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]


    at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]


    at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]


    at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]


    at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]


I’ll note that the flink-ingest-refiner jar is our shaded JAR 
application that we use to submit jobs.


Looking into what dependencies have changed, on 1.9.2 our runtime 
dependencies from the available artifacts (sourced from one of the 
many mirrors)  are:


1.flink-dist_2.11-1.9.2.jar

2.flink-table-blink_2.11-1.9.2.jar

3.flink-table_2.11-1.9.2.jar

4.log4j-1.2.17.jar

5.slf4j-log4j12-1.7.15.jar

Whereas 1.11.2’s dependencies are:

1.flink-dist_2.11-1.11.2.jar

2.flink-table-blink_2.11-1.11.2.jar


Correct way to handle RedisSink exception

2020-10-15 Thread Manas Kale
Hi,
I have a streaming application that pushes output to a redis cluster sink.
I am using the Apache Bahir[1] Flink redis connector for this. I want to
handle the case when the redis server is unavailable.
I am following the same pattern as outlined by them in [1]:

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setNodes(new HashSet(Arrays.asList(new
InetSocketAddress(5601.build();

DataStream stream = ...;
stream.addSink(new RedisSink>(conf, new
RedisExampleMapper());

However, if the redis server is not available, my whole job crashes with
this exception:

ERROR org.apache.flink.streaming.connectors.redis.RedisSink - Redis
has not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a
resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
...

I want to handle and ignore such exceptions thrown by the RedisSink class.
Where exactly do I put my try/catch to do this? Enclosing the last in the
code snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in the
RedisSink class, but that is a library class provided by Bahir. Is my
thinking correct?


asd
[1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/


Regards,
Manas


Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 Thread amen...@163.com
非常感谢您的回复!

对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。

期待您的回复,谢谢~

best, 
amenhub
 
发件人: cxydeve...@163.com
发送时间: 2020-10-15 17:46
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
我们用方法是通过反射设置env的配置,增加pipeline.classpaths
具体代码如下
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
String path = "https://...xxx.jar;;
loadJar(new URL(path));
 
Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration)configuration.get(env);
 
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map temp = (Map)confData.get(o);
List jarList = new ArrayList<>();
jarList.add(path);
temp.put("pipeline.classpaths",jarList);
 
tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
'flinksql.function.udf.CxyTestReturnSelf'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
"f_random_str STRING" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select CxyTestReturnSelf(f_random_str) " +
"from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Broadcasting control messages to a sink

2020-10-15 Thread aj
Hi Jaffe,

I am also working on something similar type of a problem.

I am receiving a set of events in Avro format on different topics. I want
to consume these and write to s3 in parquet format.
I have written a  job that creates a different stream for each event and
fetches its schema from the confluent schema registry to create a
parquet sink for an event.
This is working fine but the only problem I am facing is whenever a new
event start coming or any change in the schema  I have to change in the
YAML config and restart the job every time. Is there any way I do not have
to restart the job and it starts consuming a new set of events?

As I see you are handling schema evolution can u help me with this and also
how can I handle the new events.  In the config, I am keeping a mapping of
events and schema subjects.  Please share how you solving this.


So currently this is the way I am doing it but wanna know some better way
to handle it.

FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream dataStream =
streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink sink = StreamingFileSink.forBulkFormat
(path, 
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream outStream =
dataStream.filter((FilterFunction) genericRecord -> {
if (genericRecord != null &&
genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
{
return true;
}
return false;
});

outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}


On Wed, Oct 14, 2020, 23:12 Jaffe, Julian 
wrote:

> Thanks for the suggestion Piotr!
>
>
>
> The problem is that the sink needs to have access to the schema (so that
> it can write the schema only once per file instead of record) and thus
> needs to know when the schema has been updated. In this proposed
> architecture, I think the sink would still need to check each record to see
> if the current schema matches the new record or not? The main problem I
> encountered when playing around with broadcast state was that I couldn’t
> figure out how to access the broadcast state within the sink, but perhaps I
> just haven’t thought about it the right way. I’ll meditate on the docs
> further  
>
>
>
> Julian
>
>
>
> *From: *Piotr Nowojski 
> *Date: *Wednesday, October 14, 2020 at 6:35 AM
> *To: *"Jaffe, Julian" 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Broadcasting control messages to a sink
>
>
>
> Hi Julian,
>
>
>
> Have you seen Broadcast State [1]? I have never used it personally, but it
> sounds like something you want. Maybe your job should look like:
>
>
>
> 1. read raw messages from Kafka, without using the schema
>
> 2. read schema changes and broadcast them to 3. and 5.
>
> 3. deserialize kafka records in BroadcastProcessFunction by using combined
> 1. and 2.
>
> 4. do your logic o
>
> 5. serialize records using schema in another BroadcastProcessFunction by
> using combined 4. and 2.
>
> 6. write raw records using BucketingSink
>
> ?
>
>
>
> Best,
>
> Piotrek
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> 
>
>
>
> śr., 14 paź 2020 o 11:01 Jaffe, Julian 
> napisał(a):
>
> Hey all,
>
>
>
> I’m building a Flink app that pulls in messages from a Kafka topic and
> writes them out to disk using a custom bucketed sink. Each message needs to
> be parsed using a schema that is also needed when writing in the sink. This
> schema is read from a remote file on a distributed file system (it could
> also be fetched from a service). The schema will be updated very
> infrequently.
>
>
>
> In order to support schema evolution, I have created a custom source that
> occasionally polls for updates and if it finds one parses the new schema
> and sends a message containing the serialized schema. I’ve connected these
> two streams and then use a RichCoFlatMapFunction to flatten them back into
> a single output stream (schema events get used to update the parser,
> messages get parsed using the parser and emitted).
>
>
>
> However, I need some way to communicate the updated schema to every task

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread whh_960101
hi,
我刚才改了一下你的例子[1],通过from_elements构建一个source表
然后使用我的udf
source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
打印出来的结果能够很好的筛选出我想要的数据




但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
 source  = st_env.from_path('source')
 
source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()

这个where筛选就失效了,最后打印出全部数据


而只在where中使用udf,即
source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
打印结果就是经过筛选后的




想请问一下这种问题出在哪里?






在 2020-10-15 16:57:39,"Xingbo Huang"  写道:
>Hi,
>我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>
>
>[1]
>https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>
>Best,
>Xingbo
>
>whh_960101  于2020年10月15日周四 下午2:30写道:
>
>> 您好,我使用pyflink时的代码如下,有如下问题:
>>
>>
>> source  = st_env.from_path('source')
>> #st_env是StreamTableEnvironment,source是kafka源端
>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.BOOLEAN())
>> table =
>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> 这样打印出来的结果很好的筛选了数据
>>
>>
>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.STRING())(将msg简单处理为新的String)
>> table =
>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 这个where筛选就失效了,最后打印出全部数据
>>
>>
>> 如果改成where在前也不行,换成filter也不行
>> table =
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> select、where中的udf会冲突吗?这个问题该怎么解决?
>> 希望您们能够给予解答!感谢!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>





 

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread whh_960101
hi,
我刚才改了一下你的例子[1],通过from_elements构建一个source表
然后使用我的udf
source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
打印出来的结果能够很好的筛选出我想要的数据




但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
 source  = 
st_env.from_path('source').where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()

这个where筛选就失效了,最后打印出全部数据


想请问一下这种问题出在哪里?






在 2020-10-15 16:57:39,"Xingbo Huang"  写道:
>Hi,
>我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>
>
>[1]
>https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>
>Best,
>Xingbo
>
>whh_960101  于2020年10月15日周四 下午2:30写道:
>
>> 您好,我使用pyflink时的代码如下,有如下问题:
>>
>>
>> source  = st_env.from_path('source')
>> #st_env是StreamTableEnvironment,source是kafka源端
>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.BOOLEAN())
>> table =
>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> 这样打印出来的结果很好的筛选了数据
>>
>>
>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.STRING())(将msg简单处理为新的String)
>> table =
>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 这个where筛选就失效了,最后打印出全部数据
>>
>>
>> 如果改成where在前也不行,换成filter也不行
>> table =
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> select、where中的udf会冲突吗?这个问题该怎么解决?
>> 希望您们能够给予解答!感谢!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>


Re: kafka topic字段 不全的统计场景

2020-10-15 Thread Kyle Zhang
group by id应该就可以了吧,其他几个字段用last value或者first value[1],还有就是考虑迟到的数据怎么处理

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html

On Thu, Oct 15, 2020 at 5:01 PM 史 正超  wrote:

> 大佬们,现在我有个场景:
> 一个kafka 主题 有 4个字段 , id, field2, field3, field4,其中id 是唯一标识,
> 但是有个问题是,并不是每个消息都会带上全量的字段消息,只有id是固有的字段。然后需要把id, field2, field3, field4
> 作为一个维度 统计, 比如有如下 kafka消息:
> {"id": 1, "field2":"b"}
> {"id": 1, "field3":"c", "field4":"d"}
> 那么 按照维度 count(1)  (group by id, field2, field3, field4) 出来想要得到这样的统计:
> (1,b,  c, d)=> 1
>
> 这种需求有没有什么方案啊
>


flink sql state queryable ?

2020-10-15 Thread kandy.wang
想了解一下flink sql state里的东西,是否可以用datastream里的queryable api 查询 ? 怎么查询呢,是需要知道key 
才可以查询么。
诉求就是想知道state里到底存的啥

Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 Thread cxydeve...@163.com
我们用方法是通过反射设置env的配置,增加pipeline.classpaths
具体代码如下
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
String path = "https://...xxx.jar;;
loadJar(new URL(path));

Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration)configuration.get(env);

Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map temp = (Map)confData.get(o);
List jarList = new ArrayList<>();
jarList.add(path);
temp.put("pipeline.classpaths",jarList);

tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
'flinksql.function.udf.CxyTestReturnSelf'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
"f_random_str STRING" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select CxyTestReturnSelf(f_random_str) " +
"from sourceTable");
}

//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-15 Thread DONG, Weike
Hi Till and community,

By the way, initially I resolved the IPs several times but results returned
rather quickly (less than 1ms, possibly due to DNS cache on the server), so
I thought it might not be the DNS issue.

However, after debugging and logging, it is found that the lookup time
exhibited high variance, i. e. normally it completes fast but occasionally
some slow results would block the thread. So an unstable DNS server might
have a great impact on the performance of Flink job startup.

Best,
Weike

On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike  wrote:

> Hi Till and community,
>
> Increasing `kubernetes.jobmanager.cpu` in the configuration makes this
> issue alleviated but not disappeared.
>
> After adding DEBUG logs to the internals of *flink-runtime*, we have
> found the culprit is
>
> inetAddress.getCanonicalHostName()
>
> in *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName*
> and
> *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName*,
> which could take ~ 6 seconds to complete, thus Akka dispatcher(s)
> are severely blocked by that.
>
> By commenting out the two methods, this issue seems to be solved
> immediately, so I wonder if Flink could provide a configuration parameter
> to turn off the DNS reverse lookup process, as it seems that Flink jobs
> could run happily without it.
>
> Sincerely,
> Weike
>
>
> On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann 
> wrote:
>
>> Hi Weike,
>>
>> could you try setting kubernetes.jobmanager.cpu: 4 in your
>> flink-conf.yaml? I fear that a single CPU is too low for the JobManager
>> component.
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Weike,
>>>
>>> thanks for posting the logs. I will take a look at them. My suspicion
>>> would be that there is some operation blocking the JobMaster's main thread
>>> which causes the registrations from the TMs to time out. Maybe the logs
>>> allow me to validate/falsify this suspicion.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike 
>>> wrote:
>>>
 Hi community,

 I have uploaded the log files of JobManager and TaskManager-1-1 (one of
 the 50 TaskManagers) with DEBUG log level and default Flink configuration,
 and it clearly shows that TaskManager failed to register with JobManager
 after 10 attempts.

 Here is the link:

 JobManager:
 https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce

 TaskManager-1-1:
 https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe

 Thanks : )

 Best regards,
 Weike


 On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike 
 wrote:

> Hi community,
>
> Recently we have noticed a strange behavior for Flink jobs on
> Kubernetes per-job mode: when the parallelism increases, the time it takes
> for the TaskManagers to register with *JobManager *becomes abnormally
> long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or
> even longer for the registration attempt), and usually more than 10
> attempts are needed to finish this registration.
>
> Because of this, we could not submit a job requiring more than 20
> slots with the default configuration, as the TaskManager would say:
>
>
>> Registration at JobManager 
>> (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2)
>> attempt 9 timed out after 25600 ms
>
> Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because:
>> The slot 60d5277e138a94fb73fc6691557001e0 has timed out.
>
> Free slot TaskSlot(index:0, state:ALLOCATED, resource profile:
>> ResourceProfile{cpuCores=1., taskHeapMemory=1.425gb
>> (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb
>> (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)},
>> allocationId: 60d5277e138a94fb73fc6691557001e0, jobId:
>> 493cd86e389ccc8f2887e1222903b5ce).
>> java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has
>> timed out.
>
>
> In order to cope with this issue, we have to change the below
> configuration parameters:
>
>>
>> # Prevent "Could not allocate the required slot within slot request
>> timeout. Please make sure that the cluster has enough resources. Stopping
>> the JobMaster for job"
>> slot.request.timeout: 50
>
> # Increase max timeout in a single attempt
>> cluster.registration.max-timeout: 30
>> # Prevent "free slot (TaskSlot)"
>> akka.ask.timeout: 10 min
>> # Prevent "Heartbeat of TaskManager timed out."
>> heartbeat.timeout: 50
>
>
> However, we acknowledge that this is only a temporary dirty fix, which
> is not what we want. It could be seen that during TaskManager registration
> to JobManager, lots of warning messages come out in logs:
>
> No 

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread whh_960101
@udf(input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
def udf1(msg): #udf1就是简单的筛选log中的error关键字
  if msg is None:
 return ''
  msg_dic = json.loads(msg.strip())
  log = msg_dic.get('log').lower()
  if 'error' in log or 'fail' in log:
 return True
  else:
 return False
@udf(input_types =DataTypes.STRING(), result_type = DataTypes.STRING())
def udf2(msg): #udf2就是简单的把msg中的log提取出来
if msg is None:
 return ''
  msg_dic = json.loads(msg.strip())
  log = msg_dic.get('log')
  return log
感觉两个udf没有冲突吧?








在 2020-10-15 16:57:39,"Xingbo Huang"  写道:
>Hi,
>我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>
>
>[1]
>https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>
>Best,
>Xingbo
>
>whh_960101  于2020年10月15日周四 下午2:30写道:
>
>> 您好,我使用pyflink时的代码如下,有如下问题:
>>
>>
>> source  = st_env.from_path('source')
>> #st_env是StreamTableEnvironment,source是kafka源端
>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.BOOLEAN())
>> table =
>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> 这样打印出来的结果很好的筛选了数据
>>
>>
>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.STRING())(将msg简单处理为新的String)
>> table =
>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 这个where筛选就失效了,最后打印出全部数据
>>
>>
>> 如果改成where在前也不行,换成filter也不行
>> table =
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> select、where中的udf会冲突吗?这个问题该怎么解决?
>> 希望您们能够给予解答!感谢!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>


Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-15 Thread DONG, Weike
Hi Till and community,

Increasing `kubernetes.jobmanager.cpu` in the configuration makes this
issue alleviated but not disappeared.

After adding DEBUG logs to the internals of *flink-runtime*, we have found
the culprit is

inetAddress.getCanonicalHostName()

in *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName*
and
*org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName*,
which could take ~ 6 seconds to complete, thus Akka dispatcher(s)
are severely blocked by that.

By commenting out the two methods, this issue seems to be solved
immediately, so I wonder if Flink could provide a configuration parameter
to turn off the DNS reverse lookup process, as it seems that Flink jobs
could run happily without it.

Sincerely,
Weike


On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann  wrote:

> Hi Weike,
>
> could you try setting kubernetes.jobmanager.cpu: 4 in your
> flink-conf.yaml? I fear that a single CPU is too low for the JobManager
> component.
>
> Cheers,
> Till
>
> On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann 
> wrote:
>
>> Hi Weike,
>>
>> thanks for posting the logs. I will take a look at them. My suspicion
>> would be that there is some operation blocking the JobMaster's main thread
>> which causes the registrations from the TMs to time out. Maybe the logs
>> allow me to validate/falsify this suspicion.
>>
>> Cheers,
>> Till
>>
>> On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike 
>> wrote:
>>
>>> Hi community,
>>>
>>> I have uploaded the log files of JobManager and TaskManager-1-1 (one of
>>> the 50 TaskManagers) with DEBUG log level and default Flink configuration,
>>> and it clearly shows that TaskManager failed to register with JobManager
>>> after 10 attempts.
>>>
>>> Here is the link:
>>>
>>> JobManager:
>>> https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce
>>>
>>> TaskManager-1-1:
>>> https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe
>>>
>>> Thanks : )
>>>
>>> Best regards,
>>> Weike
>>>
>>>
>>> On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike 
>>> wrote:
>>>
 Hi community,

 Recently we have noticed a strange behavior for Flink jobs on
 Kubernetes per-job mode: when the parallelism increases, the time it takes
 for the TaskManagers to register with *JobManager *becomes abnormally
 long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or
 even longer for the registration attempt), and usually more than 10
 attempts are needed to finish this registration.

 Because of this, we could not submit a job requiring more than 20 slots
 with the default configuration, as the TaskManager would say:


> Registration at JobManager 
> (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2)
> attempt 9 timed out after 25600 ms

 Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because:
> The slot 60d5277e138a94fb73fc6691557001e0 has timed out.

 Free slot TaskSlot(index:0, state:ALLOCATED, resource profile:
> ResourceProfile{cpuCores=1., taskHeapMemory=1.425gb
> (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb
> (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)},
> allocationId: 60d5277e138a94fb73fc6691557001e0, jobId:
> 493cd86e389ccc8f2887e1222903b5ce).
> java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has
> timed out.


 In order to cope with this issue, we have to change the below
 configuration parameters:

>
> # Prevent "Could not allocate the required slot within slot request
> timeout. Please make sure that the cluster has enough resources. Stopping
> the JobMaster for job"
> slot.request.timeout: 50

 # Increase max timeout in a single attempt
> cluster.registration.max-timeout: 30
> # Prevent "free slot (TaskSlot)"
> akka.ask.timeout: 10 min
> # Prevent "Heartbeat of TaskManager timed out."
> heartbeat.timeout: 50


 However, we acknowledge that this is only a temporary dirty fix, which
 is not what we want. It could be seen that during TaskManager registration
 to JobManager, lots of warning messages come out in logs:

 No hostname could be resolved for the IP address 9.166.0.118, using IP
> address as host name. Local input split assignment (such as for HDFS 
> files)
> may be impacted.


 Initially we thought this was probably the cause (reverse lookup of DNS
 might take up a long time), however we later found that the reverse lookup
 only took less than 1ms, so maybe not because of this.

 Also, we have checked the GC log of both TaskManagers and JobManager,
 and they seem to be perfectly normal, without any signs of pauses. And the
 heartbeats are processed as normal according to the logs.

 Moreover, TaskManagers register quickly with ResourceManager, but then
 

kafka topic字段 不全的统计场景

2020-10-15 Thread 史 正超
大佬们,现在我有个场景:
一个kafka 主题 有 4个字段 , id, field2, field3, field4,其中id 是唯一标识, 
但是有个问题是,并不是每个消息都会带上全量的字段消息,只有id是固有的字段。然后需要把id, field2, field3, field4 作为一个维度 
统计, 比如有如下 kafka消息:
{"id": 1, "field2":"b"}
{"id": 1, "field3":"c", "field4":"d"}
那么 按照维度 count(1)  (group by id, field2, field3, field4) 出来想要得到这样的统计:
(1,b,  c, d)=> 1

这种需求有没有什么方案啊


Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread Xingbo Huang
Hi,
我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的


[1]
https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67

Best,
Xingbo

whh_960101  于2020年10月15日周四 下午2:30写道:

> 您好,我使用pyflink时的代码如下,有如下问题:
>
>
> source  = st_env.from_path('source')
> #st_env是StreamTableEnvironment,source是kafka源端
> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
> DataTypes.BOOLEAN())
> table =
> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>
>
> 这样打印出来的结果很好的筛选了数据
>
>
> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
> DataTypes.STRING())(将msg简单处理为新的String)
> table =
> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
> 这个where筛选就失效了,最后打印出全部数据
>
>
> 如果改成where在前也不行,换成filter也不行
> table =
> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>
>
> select、where中的udf会冲突吗?这个问题该怎么解决?
> 希望您们能够给予解答!感谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: pyflink Table object如何打印出其中内容方便调试

2020-10-15 Thread Xingbo Huang
Hi,
你想要输出table的结果,可以有两种方便的方式,
1. table.to_pandas()
2. 使用print connector,可以参考[1]

然后你如果对pyflink感兴趣,可以看看这个doc[2],可以帮助你快速上手

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html

Best,
Xingbo

whh_960101  于2020年10月15日周四 下午4:39写道:

> 您好,我使用pyflink时的代码如下,有如下问题:
>
>
> source  = st_env.from_path('source')
> #st_env是StreamTableEnvironment,source是kafka源端
> #udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
> table = source.select("msg").where(udf1(msg)=True)
>
>
> 这样单步调试print(table)出来的结果是 0x7f888fb2cef0>
> pyflink有没有将Table转化成可打印格式的方法
> 希望您们能够给予解答!感谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


pyflink Table object如何打印出其中内容方便调试

2020-10-15 Thread whh_960101
您好,我使用pyflink时的代码如下,有如下问题:


source  = st_env.from_path('source') 
#st_env是StreamTableEnvironment,source是kafka源端
#udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
table = source.select("msg").where(udf1(msg)=True)


这样单步调试print(table)出来的结果是
pyflink有没有将Table转化成可打印格式的方法
希望您们能够给予解答!感谢!










 





 





 





 

Re: what's the datasets used in flink sql document?

2020-10-15 Thread Piotr Nowojski
Hi,

The examples in the documentation are not fully functional. They assume
(like in this case), that you would have an already predefined table
orders, with the required fields. As I mentioned before, there are working
examples available and you can also read the documentation on how to
register tables on your own (both linked in my previous messages).

Best
Piotrek

śr., 14 paź 2020 o 18:12 大森林  napisał(a):

> much Manks for your replies
>
> I mean,where the "france revenue"
> in the following document ?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html
>
> Thanks for your help~
>
> -- 原始邮件 --
> *发件人:* "Piotr Nowojski" ;
> *发送时间:* 2020年10月14日(星期三) 晚上10:20
> *收件人:* "大森林";
> *抄送:* "user";
> *主题:* Re: what's the datasets used in flink sql document?
>
> Hi,
>
> Can you link what document do you have in mind? The documentation [1]? I
> don't think so.
>
> There are working examples, located in the binary distribution under the
> `examples/table/` directory. Their code is available in the repository [2].
>
> Best regards,
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#query-a-table
> [2]
> https://github.com/apache/flink/tree/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics
>
> śr., 14 paź 2020 o 15:55 大森林  napisał(a):
>
>> sorry that I did not make it clear.
>>
>> I mean:
>> Is there such a dataset can be downloaded
>> to satisfy all the examples in the document?
>>
>> Thanks for your help
>>
>> -- 原始邮件 --
>> *发件人:* "Piotr Nowojski" ;
>> *发送时间:* 2020年10月14日(星期三) 晚上9:52
>> *收件人:* "大森林";
>> *抄送:* "user";
>> *主题:* Re: what's the datasets used in flink sql document?
>>
>> Hi,
>>
>> It depends how you defined `orders` in your example. For example here [1]
>>
>> > Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)
>>
>> `orders` is obtained from the environment, from a table registered under
>> the name "Orders". You would need to first register such table, or register
>> a catalog with such table [2]
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#overview--examples
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment
>>
>> śr., 14 paź 2020 o 15:15 大森林  napisał(a):
>>
>>> Could anyone tell me
>>> what's the datasets used in flink sql document?
>>>
>>> For sql like:
>>>
>>> val revenue = orders
>>>   .filter($"cCountry" === "FRANCE")
>>>   .groupBy($"cID", $"cName")
>>>   .select($"cID", $"cName", $"revenue".sum AS "revSum")
>>>
>>>
>>> Thanks for your help
>>>
>>>


Re: kafka table connector保留多久的数据

2020-10-15 Thread Xiao Xu
flink 是不会保留数据的, 数据都是落盘在 kafka 里, flink 根据 offset 去读 kafka 里的数据, 可以设置 kafka
里留存的时间

marble.zh...@coinflex.com.INVALID 
于2020年10月14日周三 下午4:37写道:

> 你好, 用kafka table
>
> connector接过来的数据,在flink这边会保留多久,在参数列表里没有看到有这个设置,如果保留太久,内存会撑暴,比如我只想保留半个小时,之前的数据可以清除。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


????????????????????????????????

2020-10-15 Thread ????????????
Hi,All


??flink??1??12:00~12:59??flink
??
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  .timeWindow(Time.hours(1))



Streaming File Sink cannot generate _SUCCESS tag files

2020-10-15 Thread highfei2011
Hi, everyone!
  Currently experiencing a problem with the bucketing policy sink to hdfs 
using BucketAssigner of Streaming File Sink after consuming Kafka data with 
FLink -1.11.2, the _SUCCESS tag file is not generated by default.
  I have added the following to the configuration 


val hadoopConf = new Configuration()
hadoopConf.set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "true")


But there is still no _SUCCESS file in the output directory, so why not support 
generating _SUCCESS files?


Thank you.




Best,
Yang

Re: blob server相关,文件找不到

2020-10-15 Thread janick
1.9.3 也遇到同样问题,lz 解决了吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: PyFlink :: Bootstrap UDF function

2020-10-15 Thread Sharipov, Rinat
Hi Dian !

Thx a lot for your reply, it's very helpful for us.



чт, 15 окт. 2020 г. в 04:30, Dian Fu :

> Hi Rinat,
>
> It's called in single thread fashion and so there is no need for the
> synchronization.
>
> Besides, there is a pair of open/close methods in the ScalarFunction and
> you could also override them and perform the initialization work in the
> open method.
>
> Regards,
> Dian
>
> 在 2020年10月15日,上午3:19,Sharipov, Rinat  写道:
>
> Hi mates !
>
> I keep moving in my research of new features of PyFlink and I'm really
> excited about that functionality.
> My main goal is to understand how to integrate our ML registry, powered by
> ML Flow and PyFlink jobs and what restrictions we have.
>
> I need to bootstrap the UDF function on it's startup when it's
> instantiated in the Apache Beam process, but I don't know how it's called
> by PyFlink in single thread fashion or shared among multiple threads. In
> other words, I want to know, should I care about synchronization of my
> bootstrap logic or not.
>
> Here is a code example of my UDF function:
>
>
>
>
>
>
>
>
>
>
>
>
>
> *class MyFunction(ScalarFunction):def __init__(self):
> self.initialized = Falsedef __bootstrap(self):return 
> "bootstrapped"def eval(self, urls):if self.initialized:   
>  self.__bootstrap()return "my-result"my_function = udf(MyFunction(), 
> [DataTypes.ARRAY(DataTypes.STRING())], DataTypes.STRING())*
>
>
> Thx a lot for your help !
>
>
>


pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread whh_960101
您好,我使用pyflink时的代码如下,有如下问题:


source  = st_env.from_path('source') 
#st_env是StreamTableEnvironment,source是kafka源端
#只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = 
DataTypes.BOOLEAN())
table = 
source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()


这样打印出来的结果很好的筛选了数据


但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = 
DataTypes.STRING())(将msg简单处理为新的String)
table = 
source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
这个where筛选就失效了,最后打印出全部数据


如果改成where在前也不行,换成filter也不行
table = 
source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()


select、where中的udf会冲突吗?这个问题该怎么解决?
希望您们能够给予解答!感谢!










 





 





 

Re: Processing single events for minimum latency

2020-10-15 Thread Piotr Nowojski
No problem :)

Piotrek

czw., 15 paź 2020 o 08:18 Pankaj Chand 
napisał(a):

> Thank you for the quick and informative reply, Piotrek!
>
> On Thu, Oct 15, 2020 at 2:09 AM Piotr Nowojski 
> wrote:
>
>> Hi Pankay,
>>
>> Yes, you can trigger a window per each element, take a look at the Window
>> Triggers [1].
>>
>> Flink is always processing all records immediately. The only things that
>> can delay processing elements are:
>> - buffering elements on the operator's state (vide WindowOperator)
>> - buffer-timeout (but that's on the output, so it's not delaying
>> processing per se)
>> - back pressure
>> - exactly-once checkpointing (especially under the back pressure)
>>
>> > Also, is there any way I can change the execution.buffer-timeout or
>> setbuffertimeout(milliseconds) dynamically while the job is running?
>>
>> No, sorry it's not possible :(
>>
>> Best,
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers
>>
>> czw., 15 paź 2020 o 01:55 Pankaj Chand 
>> napisał(a):
>>
>>> Hi Piotrek,
>>>
>>> Thank you for replying! I want to process each record as soon as it is
>>> ingested (or reaches an operator) without waiting for a window for records
>>> to arrive. However, by not using windows, I am not sure if each record gets
>>> emitted immediately upon processing.
>>>
>>> > You still can use windowing, but you might want to emit updated value
>>> of the window per every processed record.
>>>
>>> How do I do this?
>>>
>>> Also, is there any way I can change the execution.buffer-timeout or
>>> setbuffertimeout(milliseconds) dynamically while the job is running?
>>>
>>> Thank you,
>>>
>>> Pankaj
>>>
>>> On Wed, Oct 14, 2020 at 9:42 AM Piotr Nowojski 
>>> wrote:
>>>
 Hi Pankaj,

 I'm not entirely sure if I understand your question.

 If you want to minimize latency, you should avoid using windows or any
 other operators, that are buffering data for long periods of time. You
 still can use windowing, but you might want to emit updated value of the
 window per every processed record.

 Other than that, you might also want to reduce
 `execution.buffer-timeout` from the default value of 100ms down to 1ms, or
 0ms [1]. Is this what you are looking for?

 Piotrek

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

 śr., 14 paź 2020 o 12:38 Pankaj Chand 
 napisał(a):

> Hi all,
>
> What is the recommended way to make a Flink job that processes each
> event individually as soon as it comes and without waiting for a
> window, in order to minimize latency in the entire DAG of operators?
>
> For example, here is some sample WordCount code (without windws),
> followed by some known ways:
>
>  DataStream wordCounts = text
> .flatMap(new FlatMapFunction())
> .keyBy("word")
> .reduce(new ReduceFunction());
>
>
>1. Don't include any TimeWindow/CountWindow function (does this
>actually achieve what I want?)
>2. Use a CountWindow with a count of 1
>3. Make a Trigger that fires to process each event when it comes in
>
> I think the above methods only work at the processing level and
> latency with respect to a single operator, but does not affect the latency
> of an event in the entire Flink job's DAG since those ways do not affect
> the buffertimeout value.
>
> Thanks,
>
> Pankaj
>



Re: Broadcasting control messages to a sink

2020-10-15 Thread Piotr Nowojski
Hi Julian,

I think the problem is that BroadcastProcessFunction and SinkFunction will
be executed by separate operators, so they won't be able to share state. If
you can not split your logic into two, I think you will have to workaround
this problem differently.

1. Relay on operator chaining and wire both of them together.

If you set up your BroadcastProcessFunction and SinkFunction one after
another, with the same parallelism, with the default chaining, without any
rebalance/keyBy in between, you can be sure they will be chained together.
So the output type of your record between BroadcastProcessFunction and
SinkFunction, can be a Union type, of a) your actual payload, b)
broadcasted message. Upon initialization/before processing first record, if
you have any broadcast state, you would need to forward it's content to the
downstream SinkFunction as well.

2. Another solution is that maybe you can try to embed SinkFunction inside
the BroadcastProcessFunction? This will require some careful proxying and
wrapping calls.
3. As always, you can also write a custom operator that will be doing the
same thing.

For the 2. and 3. I'm not entirely sure if there are some gotchas that I
haven't thought through (state handling?), so if you can make 1. work for
you, it will probably be a safer route.

Best,
Piotrek




śr., 14 paź 2020 o 19:42 Jaffe, Julian 
napisał(a):

> Thanks for the suggestion Piotr!
>
>
>
> The problem is that the sink needs to have access to the schema (so that
> it can write the schema only once per file instead of record) and thus
> needs to know when the schema has been updated. In this proposed
> architecture, I think the sink would still need to check each record to see
> if the current schema matches the new record or not? The main problem I
> encountered when playing around with broadcast state was that I couldn’t
> figure out how to access the broadcast state within the sink, but perhaps I
> just haven’t thought about it the right way. I’ll meditate on the docs
> further  
>
>
>
> Julian
>
>
>
> *From: *Piotr Nowojski 
> *Date: *Wednesday, October 14, 2020 at 6:35 AM
> *To: *"Jaffe, Julian" 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Broadcasting control messages to a sink
>
>
>
> Hi Julian,
>
>
>
> Have you seen Broadcast State [1]? I have never used it personally, but it
> sounds like something you want. Maybe your job should look like:
>
>
>
> 1. read raw messages from Kafka, without using the schema
>
> 2. read schema changes and broadcast them to 3. and 5.
>
> 3. deserialize kafka records in BroadcastProcessFunction by using combined
> 1. and 2.
>
> 4. do your logic o
>
> 5. serialize records using schema in another BroadcastProcessFunction by
> using combined 4. and 2.
>
> 6. write raw records using BucketingSink
>
> ?
>
>
>
> Best,
>
> Piotrek
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> 
>
>
>
> śr., 14 paź 2020 o 11:01 Jaffe, Julian 
> napisał(a):
>
> Hey all,
>
>
>
> I’m building a Flink app that pulls in messages from a Kafka topic and
> writes them out to disk using a custom bucketed sink. Each message needs to
> be parsed using a schema that is also needed when writing in the sink. This
> schema is read from a remote file on a distributed file system (it could
> also be fetched from a service). The schema will be updated very
> infrequently.
>
>
>
> In order to support schema evolution, I have created a custom source that
> occasionally polls for updates and if it finds one parses the new schema
> and sends a message containing the serialized schema. I’ve connected these
> two streams and then use a RichCoFlatMapFunction to flatten them back into
> a single output stream (schema events get used to update the parser,
> messages get parsed using the parser and emitted).
>
>
>
> However, I need some way to communicate the updated schema to every task
> of the sink. Simply emitting a control message that is ignored when writing
> to disk means that only one sink partition will receive the message and
> thus update the schema. I thought about sending the control message as side
> output and then broadcasting the resulting stream to the sink alongside the
> processed event input but I couldn’t figure out a way to do so. For now,
> I’m bundling the schema used to parse each event with the event, storing
> the schema in the sink, and then checking every event’s schema against the
> stored schema but this is fairly inefficient. Also, I’d like to eventually
> increase the types of control messages I can send to the sink, some of
> which may not be idempotent. Is there a better way to 

Re: Processing single events for minimum latency

2020-10-15 Thread Pankaj Chand
Thank you for the quick and informative reply, Piotrek!

On Thu, Oct 15, 2020 at 2:09 AM Piotr Nowojski  wrote:

> Hi Pankay,
>
> Yes, you can trigger a window per each element, take a look at the Window
> Triggers [1].
>
> Flink is always processing all records immediately. The only things that
> can delay processing elements are:
> - buffering elements on the operator's state (vide WindowOperator)
> - buffer-timeout (but that's on the output, so it's not delaying
> processing per se)
> - back pressure
> - exactly-once checkpointing (especially under the back pressure)
>
> > Also, is there any way I can change the execution.buffer-timeout or
> setbuffertimeout(milliseconds) dynamically while the job is running?
>
> No, sorry it's not possible :(
>
> Best,
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers
>
> czw., 15 paź 2020 o 01:55 Pankaj Chand 
> napisał(a):
>
>> Hi Piotrek,
>>
>> Thank you for replying! I want to process each record as soon as it is
>> ingested (or reaches an operator) without waiting for a window for records
>> to arrive. However, by not using windows, I am not sure if each record gets
>> emitted immediately upon processing.
>>
>> > You still can use windowing, but you might want to emit updated value
>> of the window per every processed record.
>>
>> How do I do this?
>>
>> Also, is there any way I can change the execution.buffer-timeout or
>> setbuffertimeout(milliseconds) dynamically while the job is running?
>>
>> Thank you,
>>
>> Pankaj
>>
>> On Wed, Oct 14, 2020 at 9:42 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi Pankaj,
>>>
>>> I'm not entirely sure if I understand your question.
>>>
>>> If you want to minimize latency, you should avoid using windows or any
>>> other operators, that are buffering data for long periods of time. You
>>> still can use windowing, but you might want to emit updated value of the
>>> window per every processed record.
>>>
>>> Other than that, you might also want to reduce
>>> `execution.buffer-timeout` from the default value of 100ms down to 1ms, or
>>> 0ms [1]. Is this what you are looking for?
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>>>
>>> śr., 14 paź 2020 o 12:38 Pankaj Chand 
>>> napisał(a):
>>>
 Hi all,

 What is the recommended way to make a Flink job that processes each
 event individually as soon as it comes and without waiting for a
 window, in order to minimize latency in the entire DAG of operators?

 For example, here is some sample WordCount code (without windws),
 followed by some known ways:

  DataStream wordCounts = text
 .flatMap(new FlatMapFunction())
 .keyBy("word")
 .reduce(new ReduceFunction());


1. Don't include any TimeWindow/CountWindow function (does this
actually achieve what I want?)
2. Use a CountWindow with a count of 1
3. Make a Trigger that fires to process each event when it comes in

 I think the above methods only work at the processing level and latency
 with respect to a single operator, but does not affect the latency of an
 event in the entire Flink job's DAG since those ways do not affect the
 buffertimeout value.

 Thanks,

 Pankaj

>>>


Re: Processing single events for minimum latency

2020-10-15 Thread Piotr Nowojski
Hi Pankay,

Yes, you can trigger a window per each element, take a look at the Window
Triggers [1].

Flink is always processing all records immediately. The only things that
can delay processing elements are:
- buffering elements on the operator's state (vide WindowOperator)
- buffer-timeout (but that's on the output, so it's not delaying processing
per se)
- back pressure
- exactly-once checkpointing (especially under the back pressure)

> Also, is there any way I can change the execution.buffer-timeout or
setbuffertimeout(milliseconds) dynamically while the job is running?

No, sorry it's not possible :(

Best,
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers

czw., 15 paź 2020 o 01:55 Pankaj Chand 
napisał(a):

> Hi Piotrek,
>
> Thank you for replying! I want to process each record as soon as it is
> ingested (or reaches an operator) without waiting for a window for records
> to arrive. However, by not using windows, I am not sure if each record gets
> emitted immediately upon processing.
>
> > You still can use windowing, but you might want to emit updated value of
> the window per every processed record.
>
> How do I do this?
>
> Also, is there any way I can change the execution.buffer-timeout or
> setbuffertimeout(milliseconds) dynamically while the job is running?
>
> Thank you,
>
> Pankaj
>
> On Wed, Oct 14, 2020 at 9:42 AM Piotr Nowojski 
> wrote:
>
>> Hi Pankaj,
>>
>> I'm not entirely sure if I understand your question.
>>
>> If you want to minimize latency, you should avoid using windows or any
>> other operators, that are buffering data for long periods of time. You
>> still can use windowing, but you might want to emit updated value of the
>> window per every processed record.
>>
>> Other than that, you might also want to reduce `execution.buffer-timeout`
>> from the default value of 100ms down to 1ms, or 0ms [1]. Is this what you
>> are looking for?
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>>
>> śr., 14 paź 2020 o 12:38 Pankaj Chand 
>> napisał(a):
>>
>>> Hi all,
>>>
>>> What is the recommended way to make a Flink job that processes each
>>> event individually as soon as it comes and without waiting for a
>>> window, in order to minimize latency in the entire DAG of operators?
>>>
>>> For example, here is some sample WordCount code (without windws),
>>> followed by some known ways:
>>>
>>>  DataStream wordCounts = text
>>> .flatMap(new FlatMapFunction())
>>> .keyBy("word")
>>> .reduce(new ReduceFunction());
>>>
>>>
>>>1. Don't include any TimeWindow/CountWindow function (does this
>>>actually achieve what I want?)
>>>2. Use a CountWindow with a count of 1
>>>3. Make a Trigger that fires to process each event when it comes in
>>>
>>> I think the above methods only work at the processing level and latency
>>> with respect to a single operator, but does not affect the latency of an
>>> event in the entire Flink job's DAG since those ways do not affect the
>>> buffertimeout value.
>>>
>>> Thanks,
>>>
>>> Pankaj
>>>
>>