Re: How does Flink plugin system work?

2023-01-02 Thread Matthias Pohl via user
Yes, Ruibin confirmed in a private message that using the factory class
works. But thanks for digging into it once more Yanfei. I missed to
consider in my previous message that the plugin classes are loaded using
their own class loaders which, indeed, can result in a
ClassNotFoundException being thrown.

Best,
Matthias

On Tue, Jan 3, 2023 at 4:45 AM Yanfei Lei  wrote:

> Hi Ruibin,
>
> "metrics.reporter.prom.class" is deprecated in 1.16, maybe "
> metrics.reporter.prom.factory.class"[1] can solve your problem.
> After reading the related code[2], I think the root cause is that  "
> metrics.reporter.prom.class" would load the code via flink's classpath
> instead of MetricReporterFactory, due to "Plugins cannot access classes
> from other plugins or from Flink that have not been specifically
> whitelisted"[3], so ClassNotFoundException is thrown.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/metric_reporters/#prometheus
> [2]
> https://github.com/apache/flink/blob/release-1.16/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java#L457
> [3]
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
>
> Matthias Pohl via user  于2023年1月2日周一 20:27写道:
>
>> Hi Ruibin,
>> could you switch to using the currently supported way for instantiating
>> reporters using the factory configuration parameter [1][2]?
>>
>> Based on the ClassNotFoundException, your suspicion might be right that
>> the plugin didn't make it onto the classpath. Could you share the
>> startup logs of the JM and TMs. That might help getting a bit more context
>> on what's going on. Your approach on integrating the reporter through the
>> plugin system [3] sounds about right as far as I can see.
>>
>> Matthias
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#factory-class
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheus
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/
>>
>> On Fri, Dec 30, 2022 at 11:42 AM Ruibin Xing  wrote:
>>
>>> Hi community,
>>>
>>> I am having difficulty understanding the Flink plugin system. I am
>>> attempting to enable the Prometheus exporter with the official Flink image
>>> 1.16.0, but I am experiencing issues with library dependencies. According
>>> to the plugin documentation (
>>> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/),
>>> as long as the library is located in the /opt/flink/plugins/
>>> directory, Flink should automatically load it, similar to how it loads
>>> libraries in the /opt/flink/lib directory. However, Flink does not seem to
>>> detect the plugin.
>>>
>>> Here is the directory structure for /opt/flink:
>>> > tree /opt/flink
>>> .
>>> 
>>> ├── plugins
>>> │   ├── metrics-prometheus
>>> │   │   └── flink-metrics-prometheus-1.16.0.jar
>>> ...
>>>
>>> And here is the related Flink configuration:
>>> > metrics.reporter.prom.class:
>>> org.apache.flink.metrics.prometheus.PrometheusReporter
>>>
>>> The error logs in the task manager show the following:
>>> 2022-12-30 10:03:55,840 WARN
>>>  org.apache.flink.runtime.metrics.ReporterSetup   [] - The
>>> reporter configuration of 'prom' configures the reporter class, which is a
>>> deprecated approach to configure reporters. Please configure a factory
>>> class instead: 'metrics.reporter.prom.factory.class: ' to
>>> ensure that the configuration continues to work with future versions.
>>> 2022-12-30 10:03:55,841 ERROR
>>> org.apache.flink.runtime.metrics.ReporterSetup   [] - Could not
>>> instantiate metrics reporter prom. Metrics might not be exposed/reported.
>>> java.lang.ClassNotFoundException:
>>> org.apache.flink.metrics.prometheus.PrometheusReporter
>>> at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
>>> ~[?:?]
>>> at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
>>> Source) ~[?:?]
>>> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>> at java.lang.Class.forName0(Native Method) ~[?:?]
>>> at java.lang.Class.forName(Unknown Source) ~[?:?]
>>> at
>>> org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:456)
>>> ~[flink-dist-1.16.0.jar:1.16.0]
>>> at
>>> org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:409)
>>> ~[flink-dist-1.16.0.jar:1.16.0]
>>> at
>>> org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:328)
>>> ~[flink-dist-1.16.0.jar:1.16.0]
>>> at
>>> org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:209)
>>> ~[flink-dist-1.16.0.jar:1.16.0]
>>>
>>> The Java commands for Flink process:
>>> flink  1  3.0  4.6 2168308 765936 ?  Ssl  10:03   1:08
>>> /opt/java/openjdk/bin/java -XX:+UseG1GC -Xmx697932173 -Xms697932173
>>> 

退订

2023-01-02 Thread 邱钺
退订
--
发件人:lk_hadoop 
发送时间:2023年1月3日(星期二) 09:53
收件人:user-zh 
主 题:使用 WindowDeduplicate 时,数据没有按照期望的时效写入sink 端
大家好:
我有一个流表,想要通过WindowDeduplicate做局部去重,代码如下:
TableResult result = tEnv.executeSql("select * from " +
 "(select ddate, userMAC, bssid, dtime, userName, apName, action, ssid, 
rawValue, ROW_NUMBER() OVER (" +
 "PARTITION BY window_start, window_end, userName ORDER BY eventTime DESC" +
 ") as row_num from " +
 "TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))" +
 ") where row_num <= 1");
result.print();
但是控制台没有数据打印出来,发现checkpoint的大小是在逐渐增加的。
执行以下代码:
TableResult result = tEnv.executeSql("select * from TABLE(TUMBLE(TABLE aplog, 
DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))");
result.print();
控制台是能正常打印出数据的。
使用WindowDeduplicate时 似乎只有数据积攒到一定数量才会写入sink
请问有知道这是什么原因的吗?


Re: How does Flink plugin system work?

2023-01-02 Thread Yanfei Lei
Hi Ruibin,

"metrics.reporter.prom.class" is deprecated in 1.16, maybe "
metrics.reporter.prom.factory.class"[1] can solve your problem.
After reading the related code[2], I think the root cause is that  "
metrics.reporter.prom.class" would load the code via flink's classpath
instead of MetricReporterFactory, due to "Plugins cannot access classes
from other plugins or from Flink that have not been specifically
whitelisted"[3], so ClassNotFoundException is thrown.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/metric_reporters/#prometheus
[2]
https://github.com/apache/flink/blob/release-1.16/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java#L457
[3]
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/

Matthias Pohl via user  于2023年1月2日周一 20:27写道:

> Hi Ruibin,
> could you switch to using the currently supported way for instantiating
> reporters using the factory configuration parameter [1][2]?
>
> Based on the ClassNotFoundException, your suspicion might be right that
> the plugin didn't make it onto the classpath. Could you share the
> startup logs of the JM and TMs. That might help getting a bit more context
> on what's going on. Your approach on integrating the reporter through the
> plugin system [3] sounds about right as far as I can see.
>
> Matthias
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#factory-class
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheus
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/
>
> On Fri, Dec 30, 2022 at 11:42 AM Ruibin Xing  wrote:
>
>> Hi community,
>>
>> I am having difficulty understanding the Flink plugin system. I am
>> attempting to enable the Prometheus exporter with the official Flink image
>> 1.16.0, but I am experiencing issues with library dependencies. According
>> to the plugin documentation (
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/),
>> as long as the library is located in the /opt/flink/plugins/
>> directory, Flink should automatically load it, similar to how it loads
>> libraries in the /opt/flink/lib directory. However, Flink does not seem to
>> detect the plugin.
>>
>> Here is the directory structure for /opt/flink:
>> > tree /opt/flink
>> .
>> 
>> ├── plugins
>> │   ├── metrics-prometheus
>> │   │   └── flink-metrics-prometheus-1.16.0.jar
>> ...
>>
>> And here is the related Flink configuration:
>> > metrics.reporter.prom.class:
>> org.apache.flink.metrics.prometheus.PrometheusReporter
>>
>> The error logs in the task manager show the following:
>> 2022-12-30 10:03:55,840 WARN
>>  org.apache.flink.runtime.metrics.ReporterSetup   [] - The
>> reporter configuration of 'prom' configures the reporter class, which is a
>> deprecated approach to configure reporters. Please configure a factory
>> class instead: 'metrics.reporter.prom.factory.class: ' to
>> ensure that the configuration continues to work with future versions.
>> 2022-12-30 10:03:55,841 ERROR
>> org.apache.flink.runtime.metrics.ReporterSetup   [] - Could not
>> instantiate metrics reporter prom. Metrics might not be exposed/reported.
>> java.lang.ClassNotFoundException:
>> org.apache.flink.metrics.prometheus.PrometheusReporter
>> at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
>> at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
>> Source) ~[?:?]
>> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>> at java.lang.Class.forName0(Native Method) ~[?:?]
>> at java.lang.Class.forName(Unknown Source) ~[?:?]
>> at
>> org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:456)
>> ~[flink-dist-1.16.0.jar:1.16.0]
>> at
>> org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:409)
>> ~[flink-dist-1.16.0.jar:1.16.0]
>> at
>> org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:328)
>> ~[flink-dist-1.16.0.jar:1.16.0]
>> at
>> org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:209)
>> ~[flink-dist-1.16.0.jar:1.16.0]
>>
>> The Java commands for Flink process:
>> flink  1  3.0  4.6 2168308 765936 ?  Ssl  10:03   1:08
>> /opt/java/openjdk/bin/java -XX:+UseG1GC -Xmx697932173 -Xms697932173
>> -XX:MaxDirectMemorySize=300647712 -XX:MaxMetaspaceSize=268435456
>> -Dlog.file=/opt/flink/log/flink--kubernetes-taskmanager-0-checkpoint-ha-example-taskmanager-1-1.log
>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>> -classpath
>> 

使用 WindowDeduplicate 时,数据没有按照期望的时效写入sink 端

2023-01-02 Thread lk_hadoop
大家好:


我有一个流表,想要通过WindowDeduplicate做局部去重,代码如下:



TableResult result = tEnv.executeSql("select * from " +

"(select ddate, userMAC, bssid, dtime, userName, apName, action, ssid, 
rawValue, ROW_NUMBER() OVER (" +

"PARTITION BY window_start, window_end, userName ORDER BY eventTime 
DESC" +

") as row_num from " +

"TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))" +

 ") where row_num <= 1");




result.print();




但是控制台没有数据打印出来,发现checkpoint的大小是在逐渐增加的。



执行以下代码:

TableResult result = tEnv.executeSql("select * from TABLE(TUMBLE(TABLE aplog, 
DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))");

result.print();


控制台是能正常打印出数据的。


使用WindowDeduplicate时 似乎只有数据积攒到一定数量才会写入sink




请问有知道这是什么原因的吗?





WindowDeduplicate didn't write result to sink effectiveness as expected

2023-01-02 Thread lk_hadoop
hi,all 
   I'm using flink 1.15.3 , I want use WindowDeduplicate to drop duplicate 
record. 
   my stream table schema is :
(
  `userName` STRING,
  `userMAC` STRING,
  `bssid` STRING,
  `ssid` STRING,
  `apName` STRING,
  `radioID` STRING,
  `vlanid` STRING,
  `action` STRING,
  `ddate` STRING,
  `dtime` STRING,
  `rawValue` STRING,
  `region` STRING,
  `eventTime` TIMESTAMP(3) *ROWTIME*
)
 when I try code below :
TableResult result = tEnv.executeSql("select * from TABLE(TUMBLE(TABLE aplog, 
DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))");

result.print(); 


I can see the print result on the console each time when the checkpoint 
complete, and checkpoint size almost not grow.
But when I try to WindowDeduplicate with code :


   TableResult result = tEnv.executeSql("select * from " +
"(select ddate, userMAC, bssid, dtime, userName, apName, action, ssid, 
rawValue, ROW_NUMBER() OVER (" +
"PARTITION BY window_start, window_end, userName ORDER BY eventTime DESC" +
") as row_num from " +
"TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '5' SECONDS))" +
") where row_num <= 1");


result.print();


I can't see print result from console , but from the log I can see the 
checkpoint size is growing after each time checkpoint trigger.
I want to know how can I make the records write to sink afer each time 
checkpoint Complete. Thanks.

Re: Flink Forward Session Question

2023-01-02 Thread Márton Balassi
Hi Rion,

Unlike the previous Flink Forwards to the best of my knowledge the latest
edition was not uploaded to YouTube. It might make sense to reach out to
the authors directly.

On Sat, Dec 31, 2022 at 5:35 PM Rion Williams  wrote:

> Hey Flinkers,
>
> Firstly, early Happy New Year’s to everyone in the community. I’ve been
> digging a bit into exactly-once processing with Flink and Pinot and I came
> across this session from Flink Foward last year:
>
> -
> https://www.slideshare.net/FlinkForward/exactlyonce-financial-data-processing-at-scale-with-flink-and-pinot
>
> I was curious if anyone knew if this session was recording as the deck
> itself seemed to have quite a bit of value. I figured the mailing list
> would be a reasonable place to ask.
>
> Thanks in advance,
>
> Rion
>


Re: The use of zookeeper in flink

2023-01-02 Thread Matthias Pohl via user
And I screwed up the reply again. -.- Here's my previous response for the
ML thread and not only spoon_lz:

Hi spoon_lz,
Thanks for reaching out to the community and sharing your use case. You're
right about the fact that Flink's HA feature relies on the leader election.
The HA backend not being responsive for too long might cause problems. I'm
not sure I understand fully what you mean by the standby JobManagers
struggling with the ZK outage shouldn't affect the running jobs. If ZK is
not responding for the standby JMs, the actual JM leader should be affected
as well which, as a consequence, would affect the job execution. But I
might misunderstand your post. Logs would be helpful to get a better
understanding of your post's context.

Best,
Matthias

FYI: There is also (a kind of stalled) discussion in the dev ML [1] about
recovery of too many jobs affecting Flink's performance.

[1] https://lists.apache.org/thread/r3fnw13j5h04z87lb34l42nvob4pq2xj

On Thu, Dec 29, 2022 at 8:55 AM spoon_lz  wrote:

> Hi All,
> We use zookeeper to achieve high availability of jobs. Recently, a failure
> occurred in our flink cluster. It was due to the abnormal downtime of the
> zookeeper service that all the flink jobs using this zookeeper all occurred
> failover. The failover startup of a large number of jobs in a short period
> of time caused the cluster The pressure is too high, which in turn causes
> the cluster to crash.
> Afterwards, I checked the HA function of zk:
> 1. Leader election
> 2. Service discovery
> 3.State persistence:
>
> The unavailability of the zookeeper service leads to failover of the flink
> job. It seems that because of the first point, JM cannot confirm whether it
> is Active or Standby, and the other two points should not affect it. But we
> didn't use the Standby JobManager.
> So in my opinion, if the JobManager of Standby is not used, whether the zk
> service is available should not affect the jobs that are running
> normally(of course, it is understandable that the task cannot be recovered
> correctly if an exception occurs), and I don’t know if there is a way to
> achieve a similar purpose
>


Re: How does Flink plugin system work?

2023-01-02 Thread Matthias Pohl via user
Hi Ruibin,
could you switch to using the currently supported way for instantiating
reporters using the factory configuration parameter [1][2]?

Based on the ClassNotFoundException, your suspicion might be right that the
plugin didn't make it onto the classpath. Could you share the startup logs
of the JM and TMs. That might help getting a bit more context on what's
going on. Your approach on integrating the reporter through the plugin
system [3] sounds about right as far as I can see.

Matthias

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#factory-class
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheus
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/

On Fri, Dec 30, 2022 at 11:42 AM Ruibin Xing  wrote:

> Hi community,
>
> I am having difficulty understanding the Flink plugin system. I am
> attempting to enable the Prometheus exporter with the official Flink image
> 1.16.0, but I am experiencing issues with library dependencies. According
> to the plugin documentation (
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/),
> as long as the library is located in the /opt/flink/plugins/
> directory, Flink should automatically load it, similar to how it loads
> libraries in the /opt/flink/lib directory. However, Flink does not seem to
> detect the plugin.
>
> Here is the directory structure for /opt/flink:
> > tree /opt/flink
> .
> 
> ├── plugins
> │   ├── metrics-prometheus
> │   │   └── flink-metrics-prometheus-1.16.0.jar
> ...
>
> And here is the related Flink configuration:
> > metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
>
> The error logs in the task manager show the following:
> 2022-12-30 10:03:55,840 WARN
>  org.apache.flink.runtime.metrics.ReporterSetup   [] - The
> reporter configuration of 'prom' configures the reporter class, which is a
> deprecated approach to configure reporters. Please configure a factory
> class instead: 'metrics.reporter.prom.factory.class: ' to
> ensure that the configuration continues to work with future versions.
> 2022-12-30 10:03:55,841 ERROR
> org.apache.flink.runtime.metrics.ReporterSetup   [] - Could not
> instantiate metrics reporter prom. Metrics might not be exposed/reported.
> java.lang.ClassNotFoundException:
> org.apache.flink.metrics.prometheus.PrometheusReporter
> at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
> at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
> Source) ~[?:?]
> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
> at java.lang.Class.forName0(Native Method) ~[?:?]
> at java.lang.Class.forName(Unknown Source) ~[?:?]
> at
> org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:456)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:409)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:328)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:209)
> ~[flink-dist-1.16.0.jar:1.16.0]
>
> The Java commands for Flink process:
> flink  1  3.0  4.6 2168308 765936 ?  Ssl  10:03   1:08
> /opt/java/openjdk/bin/java -XX:+UseG1GC -Xmx697932173 -Xms697932173
> -XX:MaxDirectMemorySize=300647712 -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/opt/flink/log/flink--kubernetes-taskmanager-0-checkpoint-ha-example-taskmanager-1-1.log
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> -classpath
> /opt/flink/lib/flink-cep-1.16.0.jar:/opt/flink/lib/flink-connector-files-1.16.0.jar:/opt/flink/lib/flink-csv-1.16.0.jar:/opt/flink/lib/flink-json-1.16.0.jar:/opt/flink/lib/flink-scala_2.12-1.16.0.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.4.1-10.0.jar:/opt/flink/lib/flink-shaded-zookeeper-3.5.9.jar:/opt/flink/lib/flink-table-api-java-uber-1.16.0.jar:/opt/flink/lib/flink-table-planner-loader-1.16.0.jar:/opt/flink/lib/flink-table-runtime-1.16.0.jar:/opt/flink/lib/log4j-1.2-api-2.17.1.jar:/opt/flink/lib/log4j-api-2.17.1.jar:/opt/flink/lib/log4j-core-2.17.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.17.1.jar:/opt/flink/lib/flink-dist-1.16.0.jar
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner
> --configDir /opt/flink/conf -Djobmanager.rpc.address=172.17.0.7
> -Dpipeline.classpaths= -Djobmanager.memory.off-heap.size=134217728b
> -Dweb.tmpdir=/tmp/flink-web-57b9e638-f313-4389-a75b-988509697edb
> -Djobmanager.rpc.port=6123
> -D.pipeline.job-id=a6f1c9fb
> -Drest.address=172.17.0.7