Re: Error about "Rejected TaskExecutor registration at the ResourceManger"

2021-06-01 Thread Kai Fu
Hi Jing,

Thank you for your reply, that cluster is terminated and will provide the
log if it occurs again.

On Wed, Jun 2, 2021 at 11:17 AM JING ZHANG  wrote:

> Hi Kai,
> The reason why job job cannot be recovered maybe not directly related to
> the exception you mentioned in your email.
> Would you like provide complete jobmanager.log and taskmanager.log. Maybe
> we could find some hints there.
>
> Best regards,
> JING ZHANG
>
> Kai Fu  于2021年6月2日周三 上午7:23写道:
>
>> HI Till,
>>
>> Thank you for your response, per my observation that the process lasted
>> for ~1 day, and cannot be recovered and we killed the cluster finally.
>>
>> On Tue, Jun 1, 2021 at 9:47 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Kai,
>>>
>>> The rejection you are seeing should not be serious. The way this can
>>> happen is the following: If Yarn restarts the application master, Flink
>>> will try to recover previously started containers. If this is not possible
>>> or Yarn only tells about a subset of the previously allocated containers,
>>> then it can happen that if a container that has not been reported to the
>>> new ResourceManager tries to register is rejected because it is not known.
>>> The idea behind this behaviour is to only accept those resources which one
>>> has knowingly requested in order to free other resources which might belong
>>> to another Yarn application.
>>>
>>> In any case, the newly started Flink ResourceManager should request new
>>> containers so that there are enough TaskManagers available to run your job
>>> (assuming that the Yarn cluster has enough resources). Hence, the cluster
>>> should recover from this situation and there should not be a lot to worry
>>> about.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sun, May 30, 2021 at 7:36 AM Kai Fu  wrote:
>>>
 Hi team,

 We encountered an issue during recovery from checkpoint. It's
 recovering because the downstream Kafka sink is full for a while and the
 job is failed and keeps trying to recover(The downstream is full for about
 4 hours). The job cannot recover from checkpoint successfully even if after
 we scaled up the Kafka cluster and shows the following exception. Is there
 any guidance on how to locate and avoid this kind of issue?



































 *2021-05-30 01:31:21,419 INFO
  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
 Connecting to ResourceManager
 akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*().2021-05-30
 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor
 [] - Resolved ResourceManager address, beginning registration2021-05-30
 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor
 [] - Fatal error occurred in TaskExecutor
 akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException:
 The TaskExecutor's registration at the ResourceManager
 akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*
 has been rejected: Rejected TaskExecutor registration at the ResourceManger
 because: The ResourceManager does not recognize this TaskExecutor.
 at
 org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
 org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
 org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
 org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
 org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
 java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_272]at
 java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_272]at
 java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
 ~[?:1.8.0_272]at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 

Re: flink1.11.2 yarn-session 部分类路径未加载

2021-06-01 Thread datayangl
启动是没有问题的,问题在于sql-client 使用yarn-session时会报错误。
/opt/flink-1.11.2/bin/sql-client.sh embedded -s yarn-session
启动sql-client之后执行hive数据查询报错,报错如下:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error., 
SinkConversionToRow -> Sink: Unnamed': Loading the input/output formats
failed:
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
at
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
... 7 more
Caused by: java.lang.Exception: Loading the input/output formats failed:
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:155)
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:59)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:212)
... 19 more
Caused by: java.lang.RuntimeException: Deserializing the input/output
formats failed: org/apache/hadoop/mapred/JobConf
at
org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.(InputOutputFormatContainer.java:68)
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:152)
... 21 more
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
at java.lang.Class.getDeclaredField(Class.java:2068)
at 
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.util.HashMap.readObject(HashMap.java:1396)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 

回复:flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-01 Thread smq
你这个解决了吗,我也遇到了同样的问题





-- 原始邮件 --
发件人: todd http://apache-flink.147419.n8.nabble.com/

Re: flink1.11.2 yarn-session 部分类路径未加载

2021-06-01 Thread Zhiwen Sun
不需要 mapreduce 相关库吧。

我看我的 job 里加载到 classpath 的也没有 mapreduce。

Zhiwen Sun



On Wed, Jun 2, 2021 at 11:56 AM datayangl  wrote:

> flink1.11.2 启动yarn-session之后发现,有部分类路径始终没有加载到class_path中去
> 环境变量配置如下:
> <
> http://apache-flink.147419.n8.nabble.com/file/t919/66604010-2A08-4A68-8478-70A27D61224B.png>
>
>
> 其中tm的日志如下:
> tm.log 
>
> 其中hadoop-mapreduce-client相关的类路径一直没有加载到class_path中,求指教
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink1.11.2 yarn-session 部分类路径未加载

2021-06-01 Thread datayangl
flink1.11.2 启动yarn-session之后发现,有部分类路径始终没有加载到class_path中去
环境变量配置如下:

 

其中tm的日志如下:
tm.log   

其中hadoop-mapreduce-client相关的类路径一直没有加载到class_path中,求指教



--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-06-01 Thread Yu Wang



Re: Error about "Rejected TaskExecutor registration at the ResourceManger"

2021-06-01 Thread JING ZHANG
Hi Kai,
The reason why job job cannot be recovered maybe not directly related to
the exception you mentioned in your email.
Would you like provide complete jobmanager.log and taskmanager.log. Maybe
we could find some hints there.

Best regards,
JING ZHANG

Kai Fu  于2021年6月2日周三 上午7:23写道:

> HI Till,
>
> Thank you for your response, per my observation that the process lasted
> for ~1 day, and cannot be recovered and we killed the cluster finally.
>
> On Tue, Jun 1, 2021 at 9:47 PM Till Rohrmann  wrote:
>
>> Hi Kai,
>>
>> The rejection you are seeing should not be serious. The way this can
>> happen is the following: If Yarn restarts the application master, Flink
>> will try to recover previously started containers. If this is not possible
>> or Yarn only tells about a subset of the previously allocated containers,
>> then it can happen that if a container that has not been reported to the
>> new ResourceManager tries to register is rejected because it is not known.
>> The idea behind this behaviour is to only accept those resources which one
>> has knowingly requested in order to free other resources which might belong
>> to another Yarn application.
>>
>> In any case, the newly started Flink ResourceManager should request new
>> containers so that there are enough TaskManagers available to run your job
>> (assuming that the Yarn cluster has enough resources). Hence, the cluster
>> should recover from this situation and there should not be a lot to worry
>> about.
>>
>> Cheers,
>> Till
>>
>> On Sun, May 30, 2021 at 7:36 AM Kai Fu  wrote:
>>
>>> Hi team,
>>>
>>> We encountered an issue during recovery from checkpoint. It's recovering
>>> because the downstream Kafka sink is full for a while and the job is failed
>>> and keeps trying to recover(The downstream is full for about 4 hours). The
>>> job cannot recover from checkpoint successfully even if after we scaled up
>>> the Kafka cluster and shows the following exception. Is there any guidance
>>> on how to locate and avoid this kind of issue?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *2021-05-30 01:31:21,419 INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
>>> Connecting to ResourceManager
>>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*().2021-05-30
>>> 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor
>>> [] - Resolved ResourceManager address, beginning registration2021-05-30
>>> 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor
>>> [] - Fatal error occurred in TaskExecutor
>>> akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException:
>>> The TaskExecutor's registration at the ResourceManager
>>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*
>>> has been rejected: Rejected TaskExecutor registration at the ResourceManger
>>> because: The ResourceManager does not recognize this TaskExecutor.
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254)
>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209)
>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109)
>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40)
>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>>> org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269)
>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>>> ~[?:1.8.0_272]at
>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>>> ~[?:1.8.0_272]at
>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>>> ~[?:1.8.0_272]at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>> [flink-dist_2.11-1.13.1.jar:1.13.1]at
>>> 

退订

2021-06-01 Thread Fighting
退订

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-01 Thread HunterXHunter
那会一直增大下去吗,我跑了4天,ckp一直变大,没有稳定的迹象。是不是我需要调整compaction的配置



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink是否支持自定义的限流功能

2021-06-01 Thread suisuimu
是的,因为我们是写ES,ES不支持动态反压,所以想问下有没有可配置的静态限流的方法



--
Sent from: http://apache-flink.147419.n8.nabble.com/

退订

2021-06-01 Thread Yu Wang



Re: Error about "Rejected TaskExecutor registration at the ResourceManger"

2021-06-01 Thread Kai Fu
HI Till,

Thank you for your response, per my observation that the process lasted for
~1 day, and cannot be recovered and we killed the cluster finally.

On Tue, Jun 1, 2021 at 9:47 PM Till Rohrmann  wrote:

> Hi Kai,
>
> The rejection you are seeing should not be serious. The way this can
> happen is the following: If Yarn restarts the application master, Flink
> will try to recover previously started containers. If this is not possible
> or Yarn only tells about a subset of the previously allocated containers,
> then it can happen that if a container that has not been reported to the
> new ResourceManager tries to register is rejected because it is not known.
> The idea behind this behaviour is to only accept those resources which one
> has knowingly requested in order to free other resources which might belong
> to another Yarn application.
>
> In any case, the newly started Flink ResourceManager should request new
> containers so that there are enough TaskManagers available to run your job
> (assuming that the Yarn cluster has enough resources). Hence, the cluster
> should recover from this situation and there should not be a lot to worry
> about.
>
> Cheers,
> Till
>
> On Sun, May 30, 2021 at 7:36 AM Kai Fu  wrote:
>
>> Hi team,
>>
>> We encountered an issue during recovery from checkpoint. It's recovering
>> because the downstream Kafka sink is full for a while and the job is failed
>> and keeps trying to recover(The downstream is full for about 4 hours). The
>> job cannot recover from checkpoint successfully even if after we scaled up
>> the Kafka cluster and shows the following exception. Is there any guidance
>> on how to locate and avoid this kind of issue?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *2021-05-30 01:31:21,419 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
>> Connecting to ResourceManager
>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*().2021-05-30
>> 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor
>> [] - Resolved ResourceManager address, beginning registration2021-05-30
>> 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor
>> [] - Fatal error occurred in TaskExecutor
>> akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException:
>> The TaskExecutor's registration at the ResourceManager
>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*
>> has been rejected: Rejected TaskExecutor registration at the ResourceManger
>> because: The ResourceManager does not recognize this TaskExecutor.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>> ~[?:1.8.0_272]at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>> ~[?:1.8.0_272]at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> ~[?:1.8.0_272]at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]at
>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> [flink-dist_2.11-1.13.1.jar:1.13.1]at
>> 

Re: Does WatermarkStrategy.withIdleness work?

2021-06-01 Thread Dan Hill
JFYI in case other users find this in the future.

ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor has a small
issue if modified to be used with the new watermark API and if the events
can have the same timestamp.  I changed my code to do this onPeriodicEmit.
In this situation, we have a lot of events with the same timestamp.  If the
code is still processing events for the same timestamp, periodic emit will
think we ran out of events (even though we've processed a bunch of events)
and then return a bad watermark.  We modified our copy of this code to keep
track of how many events have been emitted.  Since we're just using this
for local development, it's fine.


On Fri, Mar 12, 2021 at 1:55 AM Dan Hill  wrote:

> Thanks David!
>
> On Fri, Mar 12, 2021, 01:54 David Anderson  wrote:
>
>> WatermarkStrategy.withIdleness works by marking idle streams as idle, so
>> that downstream operators will ignore those streams and allow the
>> watermarks to progress based only on the advancement of the watermarks of
>> the still active streams. As you suspected, this mechanism does not provide
>> for the watermark to be advanced in situations where all of the streams are
>> idle.
>>
>> If your goal is ensure that all of the events are processed and all
>> event-time timers are fired (and all event-time windows are closed) before
>> a job ends, Flink already includes a mechanism for this purpose. If you are
>> using a bounded source, then when that source reaches the end of its input,
>> a final Watermark of value Watermark.MAX_WATERMARK will be automatically
>> emitted. The --drain option, as in
>>
>> ./bin/flink stop --drain 
>>
>> also has this effect [1].
>>
>> With a Kafka source, you can arrange for this to happen by having your
>> kafka deserializer return true from its isEndOfStream() method. Or you
>> could use the new KafkaSource connector included in Flink 1.12 with
>> its setBounded option.
>>
>> On the other hand, if you really did need to advance the watermark
>> despite a (possibly temporary) total lack of events, you could implement a
>> watermark strategy that artificially advances the watermark based on the
>> passage of processing time. You'll find an example in [2], though it hasn't
>> been updated to use the new watermark strategy interface.
>>
>> Regards,
>> David
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint
>> [2]
>> https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java
>>
>> On Fri, Mar 12, 2021 at 9:47 AM Dan Hill  wrote:
>>
>>> I haven't been able to get WatermarkStrategy.withIdleness to work.  Is
>>> it broken?  None of my timers trigger when I'd expect idleness to take over.
>>>
>>> On Tue, Mar 2, 2021 at 11:15 PM Dan Hill  wrote:
>>>
 Hi.

 For local and tests development, I want to flush the events in my
 system to make sure I'm processing everything.  My watermark does not
 progress to finish all of the data.

 What's the best practice for local development or tests?

 If I use idle sources for 1 Kafka partition, this appears broken.  I'm
 guessing there is logic to prevent removing an idle partition if it's the
 only partition.  Is there a version of this I can enable for local
 development that supports 1 partition?

 I see this tech talk.  Are there other talks to watch?
 https://www.youtube.com/watch?v=bQmz7JOmE_4=youtu.be

 Do I need to write my own watermark generator?  Or change my test data
 to have a way of generating watermarks?

 I've tried a few variants of the following source code.  The watermark
 doesn't progress in the operator right after creating the source.

 SingleOutputStreamOperator viewInput = env.addSource(...)
 .uid("source-view")
 .assignTimestampsAndWatermarks(

 WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));

>>>


Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

2021-06-01 Thread Alexey Trenikhun
Hi Till,

>However, this will stall the whole reading process if there is a partition 
>which has no more data. Hence, you will probably also need a mechanism to 
>advance the watermark if the partition becomes idle.
This is why I need to find out is partition idle. Looks like Kafka Flink 
Connector definitely has this information,  looks like derived class 
KafkaTopicPartitionStateWithWatermarkGenerator has immediateOutput and 
deferredOutput have field state which has idle flag.

Thank you for information about new KafkaConnector, I assume that you are 
referring to [1], but it seems also stalled. Or you are talking about different 
task ?

[1]-https://issues.apache.org/jira/browse/FLINK-18450
[FLINK-18450] Add watermark alignment logic to SourceReaderBase. - ASF 
JIRA
trigger comment-preview_link fieldId comment fieldName Comment rendererType 
atlassian-wiki-renderer issueKey FLINK-18450 Preview comment
issues.apache.org

Thanks,
Alexey

From: Till Rohrmann 
Sent: Tuesday, June 1, 2021 6:24 AM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

Hi Alexey,

looking at KafkaTopicPartitionStatus, it looks that it does not contain this 
information. In a nutshell, what you probably have to do is to aggregate the 
watermarks across all partitions and then pause the consumption of a partition 
if its watermark advances too much wrt to the minimum watermark. However, this 
will stall the whole reading process if there is a partition which has no more 
data. Hence, you will probably also need a mechanism to advance the watermark 
if the partition becomes idle.

Note that the community is currently working on a new KafkaConnector based on 
Flink's new source API (FLIP-27). If I am not mistaken, then these new 
interfaces should eventually also support event time alignment.

Cheers,
Till

On Fri, May 28, 2021 at 7:17 PM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
Hello,
I'm thinking about implementing custom Kafka connector which provides event 
alignment (similar to FLINK-10921, which seems abandoned). What is the way to 
determine is partition is idle from override of 
AbstractFetcher.emitRecordsWithTimestamps()? Does KafkaTopicPartitionState has 
this information ?

Thanks,
Alexey


Re: Best practice for adding support for Kafka variants

2021-06-01 Thread deepthi Sridharan
Thank you, Roman. I should have said our own flavor of Kafka and not
version. Thanks for the reference of the new source and sink
interfaces, though, as it seems like the interfaces we should be
implementing to use our custom Kafka connector.

I did notice however that the FLIP does not cover table interfaces. The
KafkaDynamicTableFactory for example is still creating a FlinkKafkaConsumer
instance. Is that something that will change in the future or are the table
interfaces somehow exceptions to the advantages of the new interface?

-- 
Regards,
Deepthi

On Thu, May 20, 2021 at 12:23 PM Roman Khachatryan  wrote:

> Hi,
>
> Those classes will likely be deprecated in the future in favor of
> FLIP-27 [1][2] source and FLIP-143 [3] sink implementations and
> eventually removed (though it won't happen soon).
> You probably should take a look at the above new APIs.
>
> Either way, there is no such a recommendation AFAIK. Copied connector
> classes will have to be updated if something in Flink changes. Maybe a
> better way would be to build your own kafka client and use it to build
> flink-kafka connector (by overriding ${kafka.version} for example).
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> [2]
> https://issues.apache.org/jira/browse/FLINK-18323
> [3]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 7:45 PM deepthi Sridharan
>  wrote:
> >
> > Hi,
> >
> > We have an internal version of Open source Kafka consumer and producer
> that we use and are working on adding that as a source and sink for flink.
> >
> > It seems like the easiest way to add the consumer as source would be to
> override the FlinkKafkaConsumer class's createFetcher method to provide our
> own derived class of KafkaFetcher class which can hookup its own version of
> the consumerThread. But the fetcher classes are annotated as Internal and
> seems like it is not meant to be used this way. (And the changes for
> Producer would be on similar lines).
> >
> > Is there a recommendation for how to add new flavors of Kafka
> Consumer/Producer from the community? Would it be recommended to maintain a
> copy of all the connector classes so we don't have to deal with changes to
> classes tagged as internal?
> >
> > --
> > Thanks & Regards
> >
>


Re: Flink Metrics Naming

2021-06-01 Thread Chesnay Schepler

Some more background on MetricGroups:
Internally there (mostly) 3 types of metric groups:
On the one hand we have the ComponentMetricGroups (like 
TaskManagerMetricGroup) that describe a high-level Flink entity, which 
just add a constant expression to the logical scope(like taskmanager, 
task etc.). These exist to support scope formats (although this 
should've been implemented differently, but that's a another story).


On the other hand we have groups created via addGroup(String), which are 
added to the logical scope as is; this is sometimes good(e.g., 
addGroup("KafkaConsumer"), and sometimes isn't (e.g., 
addGroup().
Finally, there is a addGroup(String, String) variant, which behaves like 
a key-value pair (and similarly to the ComponentMetricGroup). The key 
part is added to the logical scope, and a label is usually added as well.


Due to historical reasons some parts in Flink use addGroup(String) 
despite the key-value pair variant being more appropriate; the latter 
was only added later, as was the logical scope as a whole for that matter.


With that said, the logical scope and labels suffer a bit due to being 
retrofitted on an existing design and some early mistakes in the metric 
structuring.
Ideally (imo), things would work like this (*bold *parts signify changes 
to the current behavior):
- addGroup(String) is *sparsely used* and only for high-level 
hierarchies (job, operator, source, kafka). It is added as is to the 
logical scope, creates no label, and is *excluded from the metric 
identifier*.
- addGroup(String, String) has *no effect on the logical scope*, creates 
a label, and is added as . to the metric identifier.


The core issue with these kind of changes however is backwards 
compatibility. We would have to do a sweep over the code-base to migrate 
inappropriate usages of addGroup(String) to the key-pair variant, 
probably remove some unnecessary groups (e.g., "Status" that is used for 
CPU metrics and whatnot) and finally make changes to the metric system 
internals, all of which need a codepath that retain the current behavior.


Simply put, for immediate needs I would probably encourage you do create 
a modified PrometheusReporter which determines the logical scope as you 
see fit; it could just ignore the logical scope entirely (although I'm 
not sure how well prometheus handles 1 metric having multiple instances 
with different label sets (e.g., numRecordsIn for operators/tasks), or 
exclude user-defined groups with something hacky like only using the 
first 4 parts of the logical scope.


On 6/1/2021 4:56 PM, Mason Chen wrote:
Upon further inspection, it seems like the user scope is not universal 
(i.e. comes through the connectors and not UDFs (like rich map 
function)), but the question still stands if the process makes sense.


On Jun 1, 2021, at 10:38 AM, Mason Chen > wrote:


Makes sense. We are primarily concerned with removing the metric 
labels from the names as the user metrics get too long. i.e. the 
groups from `addGroup` are concatenated in the metric name.


Do you think there would be any issues with removing the group 
information in the metric name and putting them into a label instead? 
In seems like most metrics internally, don’t use `addGroup` to create 
group information but rather by creating another subclass of metric 
group.


Perhaps, I should ONLY apply this custom logic to metrics with the 
“user” scope? Other scoped metrics (e.g. operator, task operator, 
etc.) shouldn’t have these group names in the metric names in my 
experience...


An example just for clarity, 
flink__group1_group2_metricName{group1=…, group2=…, 
flink tags}


=>
flink__metricName{group_info=group1_group2, group1=…, 
group2=…, flink tags}


On Jun 1, 2021, at 9:57 AM, Chesnay Schepler > wrote:


The uniqueness of metrics and the naming of the Prometheus reporter 
are somewhat related but also somewhat orthogonal.


Prometheus works similar to JMX in that the metric name (e.g., 
taskmanager.job.task.operator.numRecordsIn) is more or less a 
_class_ of metrics, with tags/labels allowing you to select a 
specific instance of that metric.


Restricting metric names to 1 level of the hierarchy would present a 
few issues:
a) Effectively, all metric names that Flink uses effectively become 
reserved keywords that users must not use, which will lead to 
headaches when adding more metrics or forwarding metrics from 
libraries (e.g., kafka), because we could always break existing 
user-defined metrics.
b) You'd need a cluster-wide lookup that is aware of all hierarchies 
to ensure consistency across all processes.


In the end, there are significantly easier ways to solve the issue 
of the metric name being too long, i.e., give the user more control 
over the logical scope (taskmanager.job.task.operator), be it 
shortening the names (t.j.t.o), limiting the depth (e.g, 
operator.numRecordsIn), removing it outright (but I'd prefer 

Re: Reading Flink states from svaepoint uning State Processor API

2021-06-01 Thread Seth Wiesman
Hi Min,

The only requirement is that your state descriptors be configured
identically as those used in your datastream API. So if you registered
custom TypeInformation / serializer in your streaming job you will need
those here as well. I would also look at the ExecutionConfig on your
DataStream app as that can dictate how your serializers are configured.

Seth

On Tue, Jun 1, 2021 at 10:24 AM Till Rohrmann  wrote:

> Hi Min,
>
> Usually, you should be able to provide type information and thereby a
> serializer via the StateDescriptors which you create to access the state.
> If this is not working, then you need to give us a bit more context to
> understand what's not working.
>
> I am also pulling in Seth who is the original author of the state
> processor API.
>
> Cheers,
> Till
>
> On Mon, May 31, 2021 at 4:00 PM Tan, Min  wrote:
>
>> Hi,
>>
>>
>>
>> I am using Flink 1.10.1 and try to read the flink states from a savepoint
>> using Flink state processor API.
>>
>> It works well when state types are the normal Java type or Java POJOs.
>>
>>
>>
>> When Avro generated Java classes are used as the state type, it does not
>> read any states anymore.
>>
>>
>>
>> Are any additional customer serializers required in this situation?
>>
>>
>>
>> Regards,
>>
>> Min
>>
>>
>>
>>
>>
>


Re: S3 + Parquet credentials issue

2021-06-01 Thread Till Rohrmann
Hi Angelo,

what Svend has written is very good advice. Additionally, you could give us
a bit more context by posting the exact stack trace and the exact
configuration you use to deploy the Flink cluster. To me this looks like a
configuration/setup problem in combination with AWS.

Cheers,
Till

On Mon, May 31, 2021 at 10:49 PM Svend  wrote:

> Hi Angelo,
>
> I've not worked exactly in the situation you described, although I've had
> to configure S3 access from a Flink application recently and here are a
> couple of things I learnt along the way:
>
> * You should normally not need to include flink-s3-fs-hadoop nor
> hadoop-mapreduce-client-core in your classpath but should rather make
> flink-s3-fs-hadoop available to Flink by putting it into the plugins
> folder. The motivation for that is that this jar is a fat jar containing a
> lot of hadoop and aws classes, s.t. including it in your classpath quickly
> leads to conflicts. The plugins folder is associated with a separate
> classpath, with helps avoiding those conflicts.
>
> * Under the hood, Fink is using the hadoop-aws library to connect to s3 =>
> the documentation regarding how to configure it, and especially security
> accesses, is available in [1]
>
> * Ideally, when running on AWS, your code should not be using
> BasicAWSCredentialsProvider, but instead the application should assume a
> role, which you associate with some IAM permission.  If that's your case,
> the specific documentation for that situation is in [2]. If you're running
> some test locally on your laptop, BasicAWSCredentialsProvider with some
> key id and secret pointing to a dev account may be appropriate.
>
> * As I understand it, any configuration entry in flink.yaml that starts
> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in
> [3]) => by reading documentation in [1] and [2] you might be able to figure
> out which parameters are relevant to your case, which you can then set with
> the mechanism just mentioned. For example, in my case, I simply add this to
> flink.yaml:
>
> fs.s3a.aws.credentials.provider:
> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
>
> * You can debug the various operations that are attempted on S3 by setting
> this logger to DEBUG level:  org.apache.hadoop.fs.s3a
>
>
> Good luck :)
>
> Svend
>
>
>
> [1]
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html
> [2]
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html
> [3]
> https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink-
>
>
> On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
>
> Hello,
>
> Trying to read a parquet file located in S3 leads to a AWS credentials
> exception. Switching to other format (raw, for example) works ok regarding
> to file access.
>
> This is a snippet of code to reproduce the issue:
>
> static void parquetS3Error() {
>
> EnvironmentSettings settings = 
> EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build();
>
> TableEnvironment t_env = TableEnvironment.*create*(settings);
>
> // parquet format gives error:
> // Caused by: java.net.SocketTimeoutException: doesBucketExist on 
> bucket-prueba-medusa: com.amazonaws.AmazonClientException:
> // No AWS Credentials provided by BasicAWSCredentialsProvider 
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
> // com.amazonaws.SdkClientException: Failed to connect to service 
> endpoint:
> t_env.executeSql("CREATE TABLE backup (  `date` STRING,  `value` INT) 
> WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 
> 'parquet')");
>
> // other formats (i.e. raw) work properly:
> // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
> //++
> //|url |
> //++
> //| [80, 65, 82, 49, 21, 0, 21,... |
> //| [0, 0, 0, 50, 48, 50, 49, 4... |
> t_env.executeSql("CREATE TABLE backup (  `url` BINARY) WITH ( 'connector' 
> = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");
>
> Table t1 = t_env.from("backup");
>
> t1.execute().print();
>
> }
>
> Flink version is 1.12.2.
>
> Please find attached the pom with dependencies and version numbers.
>
> What would be a suitable workaround for this?
>
> Thank you very much.
>
> Angelo.
>
>
>
>
> *Attachments:*
>
>- pom.xml
>
>
>


Re: Reading Flink states from svaepoint uning State Processor API

2021-06-01 Thread Till Rohrmann
Hi Min,

Usually, you should be able to provide type information and thereby a
serializer via the StateDescriptors which you create to access the state.
If this is not working, then you need to give us a bit more context to
understand what's not working.

I am also pulling in Seth who is the original author of the state processor
API.

Cheers,
Till

On Mon, May 31, 2021 at 4:00 PM Tan, Min  wrote:

> Hi,
>
>
>
> I am using Flink 1.10.1 and try to read the flink states from a savepoint
> using Flink state processor API.
>
> It works well when state types are the normal Java type or Java POJOs.
>
>
>
> When Avro generated Java classes are used as the state type, it does not
> read any states anymore.
>
>
>
> Are any additional customer serializers required in this situation?
>
>
>
> Regards,
>
> Min
>
>
>
>
>


Re: savepoint fail

2021-06-01 Thread Till Rohrmann
Responded as part of the following discussion
https://lists.apache.org/x/thread.html/re85a1fb3f17b5ef1af2844fc1a07b6d2f5e6237bf4c33059e13890ee@%3Cuser.flink.apache.org%3E.
Let's continue the discussion there.

Cheers,
Till

On Mon, May 31, 2021 at 11:02 AM 周瑞  wrote:

> HI:
>   When "sink.semantic = exactual-only", the following exception is
> thrown when recovering from svaepoint
>
>public static final String KAFKA_TABLE_FORMAT =
> "CREATE TABLE "+TABLE_NAME+" (\n" +
> "  "+COLUMN_NAME+" STRING\n" +
> ") WITH (\n" +
> "   'connector' = 'kafka',\n" +
> "   'topic' = '%s',\n" +
> "   'properties.bootstrap.servers' = '%s',\n" +
> "   'sink.semantic' = 'exactly-once',\n" +
> "   'properties.transaction.timeout.ms' =
> '90',\n" +
> "   'sink.partitioner' =
> 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
> "   'format' = 'dbz-json'\n" +
> ")\n";
>   [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) -> Sink: Sink
> (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1
> )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
> FAILED with failure cause: org.apache.kafka.common.KafkaException:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the
> same transactionalId, or the producer's transaction has been expired by
> the broker.
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> .java:1352)
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> 1260)
> at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> .java:109)
> at org.apache.kafka.clients.NetworkClient.completeResponses(
> NetworkClient.java:572)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> at org.apache.kafka.clients.producer.internals.Sender
> .maybeSendAndPollTransactionalRequest(Sender.java:414)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> .java:312)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> 239)
> at java.lang.Thread.run(Thread.java:748)
>


Re: Flink Metrics Naming

2021-06-01 Thread Mason Chen
Upon further inspection, it seems like the user scope is not universal (i.e. 
comes through the connectors and not UDFs (like rich map function)), but the 
question still stands if the process makes sense.

> On Jun 1, 2021, at 10:38 AM, Mason Chen  wrote:
> 
> Makes sense. We are primarily concerned with removing the metric labels from 
> the names as the user metrics get too long. i.e. the groups from `addGroup` 
> are concatenated in the metric name.
> 
> Do you think there would be any issues with removing the group information in 
> the metric name and putting them into a label instead? In seems like most 
> metrics internally, don’t use `addGroup` to create group information but 
> rather by creating another subclass of metric group.
> 
> Perhaps, I should ONLY apply this custom logic to metrics with the “user” 
> scope? Other scoped metrics (e.g. operator, task operator, etc.) shouldn’t 
> have these group names in the metric names in my experience...
> 
> An example just for clarity, 
> flink__group1_group2_metricName{group1=…, group2=…, flink tags}
> 
> =>
> 
> flink__metricName{group_info=group1_group2, group1=…, group2=…, 
> flink tags}
> 
>> On Jun 1, 2021, at 9:57 AM, Chesnay Schepler > > wrote:
>> 
>> The uniqueness of metrics and the naming of the Prometheus reporter are 
>> somewhat related but also somewhat orthogonal.
>> 
>> Prometheus works similar to JMX in that the metric name (e.g., 
>> taskmanager.job.task.operator.numRecordsIn) is more or less a _class_ of 
>> metrics, with tags/labels allowing you to select a specific instance of that 
>> metric.
>> 
>> Restricting metric names to 1 level of the hierarchy would present a few 
>> issues:
>> a) Effectively, all metric names that Flink uses effectively become reserved 
>> keywords that users must not use, which will lead to headaches when adding 
>> more metrics or forwarding metrics from libraries (e.g., kafka), because we 
>> could always break existing user-defined metrics.
>> b) You'd need a cluster-wide lookup that is aware of all hierarchies to 
>> ensure consistency across all processes.
>> 
>> In the end, there are significantly easier ways to solve the issue of the 
>> metric name being too long, i.e., give the user more control over the 
>> logical scope (taskmanager.job.task.operator), be it shortening the names 
>> (t.j.t.o), limiting the depth (e.g, operator.numRecordsIn), removing it 
>> outright (but I'd prefer some context to be present for clarity) or 
>> supporting something similar to scope formats.
>> I'm reasonably certain there are some tickets already in this direction, we 
>> just don't get around to doing them because for the most part the metric 
>> system works good enough and there are bigger fish to fry.
>> 
>> On 6/1/2021 3:39 PM, Till Rohrmann wrote:
>>> Hi Mason,
>>> 
>>> The idea is that a metric is not uniquely identified by its name alone but 
>>> instead by its path. The groups in which it is defined specify this path 
>>> (similar to directories). That's why it is valid to specify two metrics 
>>> with the same name if they reside in different groups.
>>> 
>>> I think Prometheus does not support such a tree structure and that's why 
>>> the path is exposed via labels if I am not mistaken. So long story short, 
>>> what you are seeing is a combination of how Flink organizes metrics and 
>>> what can be reported to Prometheus. 
>>> 
>>> I am also pulling in Chesnay who is more familiar with this part of the 
>>> code.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Fri, May 28, 2021 at 7:33 PM Mason Chen >> > wrote:
>>> Can anyone give insight as to why Flink allows 2 metrics with the same 
>>> “name”?
>>> 
>>> For example,
>>> 
>>> getRuntimeContext.addGroup(“group”, “group1”).counter(“myMetricName”);
>>> 
>>> And
>>> 
>>> getRuntimeContext.addGroup(“other_group”, 
>>> “other_group1”).counter(“myMetricName”);
>>> 
>>> Are totally valid.
>>> 
>>> 
>>> It seems that it has lead to some not-so-great implementations—the 
>>> prometheus reporter and attaching the labels to the metric name, making the 
>>> name quite verbose.
>>> 
>>> 
>> 
> 



Re: classloader.resolve-order is not honored when submitting job to a remote cluster

2021-06-01 Thread tao xiao
Hi Till,

The PR covers the problem and it will fix the inconsistent class loading
issue

On Tue, Jun 1, 2021 at 9:55 PM Till Rohrmann  wrote:

> Hi Tao,
>
> I think this looks like a bug to me. Could it be that this problem is
> covered by [1, 2]? Maybe you can review this PR and check whether it solves
> the problem. If yes, then let's quickly get it in.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21445
> [2] https://github.com/apache/flink/pull/15020
>
> Cheers,
> Till
>
> On Sun, May 30, 2021 at 9:41 AM tao xiao  wrote:
>
>> Hi team,
>>
>> I discovered that child first class loader is always used to initialize
>> the main program when submitting the job to a yarn cluster using
>> application mode regardless of what value classloader.resolve-order is set
>> in flink-conf.yaml. But this is not the case if I submit the same job with
>> the same config to the local cluster which honors the config and use the
>> correct class loader to load the main program. Here is the log from local
>> cluster
>>
>> 2021-05-30 15:01:16,372 INFO  org.apache.flink.client.cli.CliFrontend
>>  [] -
>> 
>> 2021-05-30 15:01:16,375 INFO  org.apache.flink.client.cli.CliFrontend
>>  [] -  Starting Command Line Client (Version: 1.12.1,
>> Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
>> [trim down the log]
>> *2021-05-30 15:01:16,616 INFO
>>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: classloader.resolve-order, parent-first*
>> 2021-05-30 15:01:16,763 WARN  org.apache.flink.runtime.util.HadoopUtils
>>  [] - Could not find Hadoop configuration via any of the
>> supported methods (Flink configuration, environment variables).
>> [trim down the log]
>> 2021-05-30 15:01:16,830 INFO  org.apache.flink.client.ClientUtils
>>  [] - Starting program (detached: false)
>> *2021-05-30 15:01:16,871 INFO  io.demo.flink.WordCount
>>[] - Loaded by
>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6*
>>
>> Here is the log from yarn cluster
>> 2021-05-30 07:20:14,434 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>> 
>> 2021-05-30 07:20:14,438 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>>  Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11,
>> Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
>> [trim down the log]
>> 2021-05-30 07:20:15,205 INFO
>>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: taskmanager.memory.process.size, 2048m
>> *2021-05-30 07:20:15,205 INFO
>>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: classloader.resolve-order, parent-first*
>> 2021-05-30 07:20:15,205 INFO
>>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: metrics.scope.jm, flink.jobmanager
>> [trim down the log]
>> *2021-05-30 07:20:21,383 INFO  io.demo.flink.WordCount
>>[] - Loaded by
>> org.apache.flink.util.ChildFirstClassLoader@3da30852*
>>
>> Here is the job to reproduce the problem
>>
>> public static void main(String[] args) throws Exception {
>>
>>  // set up the execution environment
>>  final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>  LOG.info("Loaded by {}", WordCount.class.getClassLoader());
>>  // get input data
>>  DataStreamSource text = env.fromElements(
>>"To be, or not to be,--that is the question:--",
>>"Whether 'tis nobler in the mind to suffer",
>>"The slings and arrows of outrageous fortune",
>>"Or to take arms against a sea of troubles,"
>>);
>>
>>text.print();
>>  env.execute("demo job");
>>
>> }
>>
>>
>> Flink version 1.12.1
>>
>> I believe the inconsistency is the result of user defined flink-conf not
>> passed to PackageProgram which uses default config instead
>>
>>
>> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java#L109
>>
>> Not sure if this is expected behavior that we never assume the main
>> program is loaded with the configured class loader
>> --
>> Regards,
>> Tao
>>
>

-- 
Regards,
Tao


Re: Flink Metrics Naming

2021-06-01 Thread Mason Chen
Makes sense. We are primarily concerned with removing the metric labels from 
the names as the user metrics get too long. i.e. the groups from `addGroup` are 
concatenated in the metric name.

Do you think there would be any issues with removing the group information in 
the metric name and putting them into a label instead? In seems like most 
metrics internally, don’t use `addGroup` to create group information but rather 
by creating another subclass of metric group.

Perhaps, I should ONLY apply this custom logic to metrics with the “user” 
scope? Other scoped metrics (e.g. operator, task operator, etc.) shouldn’t have 
these group names in the metric names in my experience...

An example just for clarity, 
flink__group1_group2_metricName{group1=…, group2=…, flink tags}

=>

flink__metricName{group_info=group1_group2, group1=…, group2=…, 
flink tags}

> On Jun 1, 2021, at 9:57 AM, Chesnay Schepler  wrote:
> 
> The uniqueness of metrics and the naming of the Prometheus reporter are 
> somewhat related but also somewhat orthogonal.
> 
> Prometheus works similar to JMX in that the metric name (e.g., 
> taskmanager.job.task.operator.numRecordsIn) is more or less a _class_ of 
> metrics, with tags/labels allowing you to select a specific instance of that 
> metric.
> 
> Restricting metric names to 1 level of the hierarchy would present a few 
> issues:
> a) Effectively, all metric names that Flink uses effectively become reserved 
> keywords that users must not use, which will lead to headaches when adding 
> more metrics or forwarding metrics from libraries (e.g., kafka), because we 
> could always break existing user-defined metrics.
> b) You'd need a cluster-wide lookup that is aware of all hierarchies to 
> ensure consistency across all processes.
> 
> In the end, there are significantly easier ways to solve the issue of the 
> metric name being too long, i.e., give the user more control over the logical 
> scope (taskmanager.job.task.operator), be it shortening the names (t.j.t.o), 
> limiting the depth (e.g, operator.numRecordsIn), removing it outright (but 
> I'd prefer some context to be present for clarity) or supporting something 
> similar to scope formats.
> I'm reasonably certain there are some tickets already in this direction, we 
> just don't get around to doing them because for the most part the metric 
> system works good enough and there are bigger fish to fry.
> 
> On 6/1/2021 3:39 PM, Till Rohrmann wrote:
>> Hi Mason,
>> 
>> The idea is that a metric is not uniquely identified by its name alone but 
>> instead by its path. The groups in which it is defined specify this path 
>> (similar to directories). That's why it is valid to specify two metrics with 
>> the same name if they reside in different groups.
>> 
>> I think Prometheus does not support such a tree structure and that's why the 
>> path is exposed via labels if I am not mistaken. So long story short, what 
>> you are seeing is a combination of how Flink organizes metrics and what can 
>> be reported to Prometheus. 
>> 
>> I am also pulling in Chesnay who is more familiar with this part of the code.
>> 
>> Cheers,
>> Till
>> 
>> On Fri, May 28, 2021 at 7:33 PM Mason Chen > > wrote:
>> Can anyone give insight as to why Flink allows 2 metrics with the same 
>> “name”?
>> 
>> For example,
>> 
>> getRuntimeContext.addGroup(“group”, “group1”).counter(“myMetricName”);
>> 
>> And
>> 
>> getRuntimeContext.addGroup(“other_group”, 
>> “other_group1”).counter(“myMetricName”);
>> 
>> Are totally valid.
>> 
>> 
>> It seems that it has lead to some not-so-great implementations—the 
>> prometheus reporter and attaching the labels to the metric name, making the 
>> name quite verbose.
>> 
>> 
> 



Re: RabbitMQ source does not stop unless message arrives in queue

2021-06-01 Thread Jose Vargas
Hi all,

Apologies for not following up sooner. Thank you Austin for creating
FLINK-22698. It seems that the issue is well understood and a fix is
currently under development/review. Please let me know if there is anything
additional that I can do. I look forward to testing out a new version of
Flink that includes this fix.

Thanks again,
Jose

On Tue, May 18, 2021 at 4:38 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey all,
>
> Thanks for the details, John! Hmm, that doesn't look too good either 
> but probably a different issue with the RMQ source/ sink. Hopefully, the
> new FLIP-27 sources will help you guys out there! The upcoming HybridSource
> in FLIP-150 [1] might also be interesting to you in finely controlling
> sources.
>
> @Jose Vargas  I've created FLINK-22698 [2] to
> track your issue. Do you have a small reproducible case/ GitHub repo? Also,
> would you be able to provide a little bit more about the Flink job that you
> see this issue in? i.e. overall parallelism, the parallelism of the
> sources/ sinks, checkpointing mode.
>
> Best,
> Austin
>
> [1]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> [2]: https://issues.apache.org/jira/browse/FLINK-22698
>
> On Thu, May 13, 2021 at 9:25 PM John Morrow 
> wrote:
>
>> Hi Jose, hey Austin!!
>>
>> I know we were just recently looking at trying to consume a fixed number
>> of messages from an RMQ source, process them and output them to an RMQ
>> sink. As a naive first attempt at stopping the job when the target number
>> of messaged had been processed, we put a counter state in the process
>> function and tried throwing an exception when the counter >= the target
>> message count.
>>
>> The job had:
>>
>>- parallelism: 1
>>- checkpointing: 1000 (1 sec)
>>- restartStrategy: noRestart
>>- prefetchCount: 100
>>
>> Running it with 150 messages in the input queue and 150 also as the
>> target number, at the end the queues had:
>>
>>- output queue - 150
>>- input queue - 50
>>
>> So it looks like it did transfer all the messages, but some unack'd ones
>> also got requeued back at the source so end up as duplicates. I know
>> throwing an exception in the Flink job is not the same as triggering a
>> stateful shutdown, but it might be hitting similar unack issues.
>>
>> John
>>
>> --
>> *From:* Austin Cawley-Edwards 
>> *Sent:* Thursday 13 May 2021 16:49
>> *To:* Jose Vargas ; John Morrow <
>> johnniemor...@hotmail.com>
>> *Cc:* user 
>> *Subject:* Re: RabbitMQ source does not stop unless message arrives in
>> queue
>>
>> Hey Jose,
>>
>> Thanks for bringing this up – it indeed sounds like a bug. There is
>> ongoing work to update the RMQ source to the new interface, which might
>> address some of these issues (or should, if it is not already), tracked in
>> FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would
>> you like me to?
>>
>> At my previous company, we only consumed one Rabbit queue per
>> application, so we didn't run into this exactly but did see other weird
>> behavior in the RMQ source that could be related. I'm going to cc @John
>> Morrow  who might be able to contribute to
>> what he's seen working with the source, if he's around. I remember some
>> messages not properly being ack'ed during a stateful shutdown via the
>> Ververica Platform's stop-with-savepoint functionality that you mention,
>> though that might be more related to FLINK-20244[2], perhaps.
>>
>>
>> Best,
>> Austin
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-20628
>> [2]: https://issues.apache.org/jira/browse/FLINK-20244
>>
>> On Thu, May 13, 2021 at 10:23 AM Jose Vargas 
>> wrote:
>>
>> Hi,
>>
>> I am using Flink 1.12 to read from and write to a RabbitMQ cluster.
>> Flink's RabbitMQ source has some surprising behavior when a
>> stop-with-savepoint request is made.
>>
>> *Expected Behavior:*
>> The stop-with-savepoint request stops the job with a FINISHED state.
>>
>> *Actual Behavior:*
>> The stop-with-savepoint request either times out or hangs indefinitely
>> unless a message arrives in all the queues that the job consumes from after
>> the stop-with-savepoint request is made.
>>
>>
>> I know that one possible workaround is to send a sentinel value to each
>> of the queues consumed by the job that the deserialization schema checks in
>> its isEndOfStream method. However, this is somewhat cumbersome and
>> complicates the continuous delivery of a Flink job. For example,
>> Ververica Platform will trigger a stop-with-savepoint for the user if one
>> of many possible Flink configurations for a job are changed. The
>> stop-with-savepoint can then hang indefinitely because only some of the
>> RabbitMQ sources will have reached a FINISHED state.
>>
>> I have attached the TaskManager thread dump after the save-with-savepoint
>> request was made. Most every thread is either sleeping or waiting around
>> for locks to be released, and then 

Re: Got exception when running the localhost cluster

2021-06-01 Thread Till Rohrmann
Hi Lingfeng,

Youngwoo is right. Flink currently officially supports Java 8 and Java 11.

Cheers,
Till

On Mon, May 31, 2021 at 9:00 AM Youngwoo Kim (김영우)  wrote:

> Hi Lingfeng,
>
> I believe Java 8 or 11 is appropriate for the Flink cluster at this point.
> I'm not sure that Flink 1.13 supports Java 16 officially.
>
> Thanks,
> Youngwoo
>
> On Mon, May 31, 2021 at 2:49 PM Lingfeng Pu  wrote:
>
>> Hi,
>>
>> I'm new to Flink. I got a problem when running the local cluster on my
>> computer. Some key software information as follows:
>>
>> 1. Flink version: 1.13.0 for Scala 2.11;
>> 2. OS: Fedora 34;
>> 3. Java version: 16;
>> 4. Scala version: 2.11.12.
>>
>> When I started up the local cluster by command line, everything seems
>> fine from the command line, BUT I could not access the localhost:8081 is
>> failed to open. Furthermore, the exception comes out when I running the
>> Flink example, please see all the details below:
>>
>> [root@localhost flink-1.13.0]# ./bin/start-cluster.sh
>> Starting cluster.
>> Starting standalonesession daemon on host fedora.
>> Starting taskexecutor daemon on host fedora.
>> [root@localhost flink-1.13.0]# ./bin/flink run
>> examples/streaming/WordCount.jar
>> Executing WordCount example with default input data set.
>> Use --input to specify file input.
>> Printing result to stdout. Use --output to specify output path.
>>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Unable to make field private final byte[]
>> java.lang.String.value accessible: module java.base does not "opens
>> java.lang" to unnamed module @2baf3d81
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>> Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make
>> field private final byte[] java.lang.String.value accessible: module
>> java.base does not "opens java.lang" to unnamed module @2baf3d81
>> at
>> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)
>> at
>> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
>> at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177)
>> at java.base/java.lang.reflect.Field.setAccessible(Field.java:171)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:106)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2053)
>> at
>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
>> at
>> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
>> at
>> org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:937)
>> at
>> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:567)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>> ... 8 more
>>
>> I tried search solutions online, but nothing useful for me so far. I
>> urgently need some specific advice about how to solve this issue! I'll be
>> grateful for that :)
>>
>>


Re: Flink Metrics Naming

2021-06-01 Thread Chesnay Schepler
The uniqueness of metrics and the naming of the Prometheus reporter are 
somewhat related but also somewhat orthogonal.


Prometheus works similar to JMX in that the metric name (e.g., 
taskmanager.job.task.operator.numRecordsIn) is more or less a _class_ of 
metrics, with tags/labels allowing you to select a specific instance of 
that metric.


Restricting metric names to 1 level of the hierarchy would present a few 
issues:
a) Effectively, all metric names that Flink uses effectively become 
reserved keywords that users must not use, which will lead to headaches 
when adding more metrics or forwarding metrics from libraries (e.g., 
kafka), because we could always break existing user-defined metrics.
b) You'd need a cluster-wide lookup that is aware of all hierarchies to 
ensure consistency across all processes.


In the end, there are significantly easier ways to solve the issue of 
the metric name being too long, i.e., give the user more control over 
the logical scope (taskmanager.job.task.operator), be it shortening the 
names (t.j.t.o), limiting the depth (e.g, operator.numRecordsIn), 
removing it outright (but I'd prefer some context to be present for 
clarity) or supporting something similar to scope formats.
I'm reasonably certain there are some tickets already in this direction, 
we just don't get around to doing them because for the most part the 
metric system works good enough and there are bigger fish to fry.


On 6/1/2021 3:39 PM, Till Rohrmann wrote:

Hi Mason,

The idea is that a metric is not uniquely identified by its name alone 
but instead by its path. The groups in which it is defined specify 
this path (similar to directories). That's why it is valid to specify 
two metrics with the same name if they reside in different groups.


I think Prometheus does not support such a tree structure and that's 
why the path is exposed via labels if I am not mistaken. So long story 
short, what you are seeing is a combination of how Flink organizes 
metrics and what can be reported to Prometheus.


I am also pulling in Chesnay who is more familiar with this part of 
the code.


Cheers,
Till

On Fri, May 28, 2021 at 7:33 PM Mason Chen > wrote:


Can anyone give insight as to why Flink allows 2 metrics with the
same “name”?

For example,

getRuntimeContext.addGroup(“group”, “group1”).counter(“myMetricName”);

And

getRuntimeContext.addGroup(“other_group”,
“other_group1”).counter(“myMetricName”);

Are totally valid.


It seems that it has lead to some not-so-great implementations—the
prometheus reporter and attaching the labels to the metric name,
making the name quite verbose.






Re: Flink kafka

2021-06-01 Thread Till Rohrmann
Responded as part of the following discussion
https://lists.apache.org/x/thread.html/re85a1fb3f17b5ef1af2844fc1a07b6d2f5e6237bf4c33059e13890ee@%3Cuser.flink.apache.org%3E.
Let's continue the discussion there.

Cheers,
Till

On Sun, May 30, 2021 at 2:32 PM 周瑞  wrote:

>
> 程序用于测试 flink kafka exactly once, 普通提交可以正常运行, 但如果从 savepoint 中恢复就会报下面的错误
> kafka server 端, 配置了  transaction.timeout.ms = 90
>
> public static final String KAFKA_TABLE_FORMAT =
> "CREATE TABLE "+TABLE_NAME+" (\n" +
> "  "+COLUMN_NAME+" STRING\n" +
> ") WITH (\n" +
> "   'connector' = 'kafka',\n" +
> "   'topic' = '%s',\n" +
> "   'properties.bootstrap.servers' = '%s',\n" +
> "   'sink.semantic' = 'exactly-once',\n" +
> "   'properties.transaction.timeout.ms' = '90',\n" +
> "   'sink.partitioner' = 
> 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
> "   'properties.max.in.flight.requests.per.connection' = 
> '1',\n" +
> "   'properties.enable.idempotence' = 'true',\n" +
> "   'properties.transactional.id' = '%s',\n" +
> "   'format' = 'dbz-json'\n" +
> ")\n";
>
>
> 2021-05-30 19:38:57,513 WARN org.apache.flink.runtime.taskmanager.Task []
> - Source: TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) -> Sink: Sink
> (table=[default_catalog.default_database.KafkaTable2], fields=[data]) (1/1
> )#144518 (4739d37a5f82268901f8fb51b39735e9) switched from INITIALIZING to
> FAILED with failure cause: org.apache.kafka.common.KafkaException:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the
> same transactionalId, or the producer's transaction has been expired by
> the broker.
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> .java:1352)
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> 1260)
> at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> .java:109)
> at org.apache.kafka.clients.NetworkClient.completeResponses(
> NetworkClient.java:572)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> at org.apache.kafka.clients.producer.internals.Sender
> .maybeSendAndPollTransactionalRequest(Sender.java:414)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> .java:312)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> 239)
> at java.lang.Thread.run(Thread.java:748)
>
> 查阅了 google 上的相关资料, 但是仍然无法解决, 有人遇到过类似的问题? 或者能提供排查思路么?
>


Re: classloader.resolve-order is not honored when submitting job to a remote cluster

2021-06-01 Thread Till Rohrmann
Hi Tao,

I think this looks like a bug to me. Could it be that this problem is
covered by [1, 2]? Maybe you can review this PR and check whether it solves
the problem. If yes, then let's quickly get it in.

[1] https://issues.apache.org/jira/browse/FLINK-21445
[2] https://github.com/apache/flink/pull/15020

Cheers,
Till

On Sun, May 30, 2021 at 9:41 AM tao xiao  wrote:

> Hi team,
>
> I discovered that child first class loader is always used to initialize
> the main program when submitting the job to a yarn cluster using
> application mode regardless of what value classloader.resolve-order is set
> in flink-conf.yaml. But this is not the case if I submit the same job with
> the same config to the local cluster which honors the config and use the
> correct class loader to load the main program. Here is the log from local
> cluster
>
> 2021-05-30 15:01:16,372 INFO  org.apache.flink.client.cli.CliFrontend
>  [] -
> 
> 2021-05-30 15:01:16,375 INFO  org.apache.flink.client.cli.CliFrontend
>  [] -  Starting Command Line Client (Version: 1.12.1,
> Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
> [trim down the log]
> *2021-05-30 15:01:16,616 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: classloader.resolve-order, parent-first*
> 2021-05-30 15:01:16,763 WARN  org.apache.flink.runtime.util.HadoopUtils
>  [] - Could not find Hadoop configuration via any of the
> supported methods (Flink configuration, environment variables).
> [trim down the log]
> 2021-05-30 15:01:16,830 INFO  org.apache.flink.client.ClientUtils
>  [] - Starting program (detached: false)
> *2021-05-30 15:01:16,871 INFO  io.demo.flink.WordCount
>  [] - Loaded by
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6*
>
> Here is the log from yarn cluster
> 2021-05-30 07:20:14,434 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> 
> 2021-05-30 07:20:14,438 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>  Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11,
> Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
> [trim down the log]
> 2021-05-30 07:20:15,205 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.memory.process.size, 2048m
> *2021-05-30 07:20:15,205 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: classloader.resolve-order, parent-first*
> 2021-05-30 07:20:15,205 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: metrics.scope.jm, flink.jobmanager
> [trim down the log]
> *2021-05-30 07:20:21,383 INFO  io.demo.flink.WordCount
>  [] - Loaded by
> org.apache.flink.util.ChildFirstClassLoader@3da30852*
>
> Here is the job to reproduce the problem
>
> public static void main(String[] args) throws Exception {
>
>  // set up the execution environment
>  final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>  LOG.info("Loaded by {}", WordCount.class.getClassLoader());
>  // get input data
>  DataStreamSource text = env.fromElements(
>"To be, or not to be,--that is the question:--",
>"Whether 'tis nobler in the mind to suffer",
>"The slings and arrows of outrageous fortune",
>"Or to take arms against a sea of troubles,"
>);
>
>text.print();
>  env.execute("demo job");
>
> }
>
>
> Flink version 1.12.1
>
> I believe the inconsistency is the result of user defined flink-conf not
> passed to PackageProgram which uses default config instead
>
>
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java#L109
>
> Not sure if this is expected behavior that we never assume the main
> program is loaded with the configured class loader
> --
> Regards,
> Tao
>


Re: Error about "Rejected TaskExecutor registration at the ResourceManger"

2021-06-01 Thread Till Rohrmann
Hi Kai,

The rejection you are seeing should not be serious. The way this can happen
is the following: If Yarn restarts the application master, Flink will try
to recover previously started containers. If this is not possible or Yarn
only tells about a subset of the previously allocated containers, then it
can happen that if a container that has not been reported to the new
ResourceManager tries to register is rejected because it is not known. The
idea behind this behaviour is to only accept those resources which one has
knowingly requested in order to free other resources which might belong to
another Yarn application.

In any case, the newly started Flink ResourceManager should request new
containers so that there are enough TaskManagers available to run your job
(assuming that the Yarn cluster has enough resources). Hence, the cluster
should recover from this situation and there should not be a lot to worry
about.

Cheers,
Till

On Sun, May 30, 2021 at 7:36 AM Kai Fu  wrote:

> Hi team,
>
> We encountered an issue during recovery from checkpoint. It's recovering
> because the downstream Kafka sink is full for a while and the job is failed
> and keeps trying to recover(The downstream is full for about 4 hours). The
> job cannot recover from checkpoint successfully even if after we scaled up
> the Kafka cluster and shows the following exception. Is there any guidance
> on how to locate and avoid this kind of issue?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *2021-05-30 01:31:21,419 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
> Connecting to ResourceManager
> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*().2021-05-30
> 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor
> [] - Resolved ResourceManager address, beginning registration2021-05-30
> 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor
> [] - Fatal error occurred in TaskExecutor
> akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException:
> The TaskExecutor's registration at the ResourceManager
> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*
> has been rejected: Rejected TaskExecutor registration at the ResourceManger
> because: The ResourceManager does not recognize this TaskExecutor.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> ~[?:1.8.0_272]at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> ~[?:1.8.0_272]at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> ~[?:1.8.0_272]at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> 

Re: Getting Abstract method Error with flink 1.13.0 and 1.12.0

2021-06-01 Thread Chesnay Schepler
It appears as if flink-siddhi was build against Flink 1.9.0, so it may 
just not be compatible with later versions of Flink.


You will either need to reach out to the maintainers of flink-siddhi, or 
migrate it to a later Flink version yourself.


On 6/1/2021 3:11 PM, Dipanjan Mazumder wrote:


Hi ,

   I have integrated flink-siddhi library
([com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"]

)
and i tried to configure and implement control stream from
flink-siddh and it broke with AbstractMethodError. When i tried
running the same with flink 1.11.0 it worked.

More Details is given in this stack overflow link : Flink-Siddhi
control event failing to start



Any help on this will be very great and will help me go forward:








Flink-Siddhi control event failing to start

While trying to configure and implement flink-
siddhi(https://clojars.org/com.github.haoch/flink-siddhi_2.11/ver...













[com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"]

[com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"] null




Regards
Dipanjan





Re: Flink Metrics Naming

2021-06-01 Thread Till Rohrmann
Hi Mason,

The idea is that a metric is not uniquely identified by its name alone but
instead by its path. The groups in which it is defined specify this path
(similar to directories). That's why it is valid to specify two metrics
with the same name if they reside in different groups.

I think Prometheus does not support such a tree structure and that's why
the path is exposed via labels if I am not mistaken. So long story short,
what you are seeing is a combination of how Flink organizes metrics and
what can be reported to Prometheus.

I am also pulling in Chesnay who is more familiar with this part of the
code.

Cheers,
Till

On Fri, May 28, 2021 at 7:33 PM Mason Chen  wrote:

> Can anyone give insight as to why Flink allows 2 metrics with the same
> “name”?
>
> For example,
>
> getRuntimeContext.addGroup(“group”, “group1”).counter(“myMetricName”);
>
> And
>
> getRuntimeContext.addGroup(“other_group”,
> “other_group1”).counter(“myMetricName”);
>
> Are totally valid.
>
>
> It seems that it has lead to some not-so-great implementations—the
> prometheus reporter and attaching the labels to the metric name, making the
> name quite verbose.
>
>
>


Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

2021-06-01 Thread Till Rohrmann
Hi Alexey,

looking at KafkaTopicPartitionStatus, it looks that it does not contain
this information. In a nutshell, what you probably have to do is to
aggregate the watermarks across all partitions and then pause the
consumption of a partition if its watermark advances too much wrt to the
minimum watermark. However, this will stall the whole reading process if
there is a partition which has no more data. Hence, you will probably also
need a mechanism to advance the watermark if the partition becomes idle.

Note that the community is currently working on a new KafkaConnector based
on Flink's new source API (FLIP-27). If I am not mistaken, then these new
interfaces should eventually also support event time alignment.

Cheers,
Till

On Fri, May 28, 2021 at 7:17 PM Alexey Trenikhun  wrote:

> Hello,
> I'm thinking about implementing custom Kafka connector which provides
> event alignment (similar to FLINK-10921, which seems abandoned). What is
> the way to determine is partition is idle from override
> of AbstractFetcher.emitRecordsWithTimestamps()?
> Does KafkaTopicPartitionState has this information ?
>
> Thanks,
> Alexey
>


Re: recover from svaepoint

2021-06-01 Thread Till Rohrmann
The error message says that we are trying to reuse a transaction id that is
currently being used or has expired.

I am not 100% sure how this can happen. My suspicion is that you have
resumed a job multiple times from the same savepoint. Have you checked that
there is no other job which has been resumed from the same savepoint and
which is currently running or has run and completed checkpoints?

@pnowojski  @Becket Qin  how
does the transaction id generation ensures that we don't have a clash of
transaction ids if we resume the same job multiple times from the same
savepoint? From the code, I do see that we have a TransactionalIdsGenerator
which is initialized with the taskName and the operator UID.

fyi: @Arvid Heise 

Cheers,
Till


On Mon, May 31, 2021 at 11:10 AM 周瑞  wrote:

> HI:
>   When "sink.semantic = exactly-once", the following exception is
> thrown when recovering from svaepoint
>
>public static final String KAFKA_TABLE_FORMAT =
> "CREATE TABLE "+TABLE_NAME+" (\n" +
> "  "+COLUMN_NAME+" STRING\n" +
> ") WITH (\n" +
> "   'connector' = 'kafka',\n" +
> "   'topic' = '%s',\n" +
> "   'properties.bootstrap.servers' = '%s',\n" +
> "   'sink.semantic' = 'exactly-once',\n" +
> "   'properties.transaction.timeout.ms' =
> '90',\n" +
> "   'sink.partitioner' =
> 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
> "   'format' = 'dbz-json'\n" +
> ")\n";
>   [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) -> Sink: Sink
> (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1
> )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
> FAILED with failure cause: org.apache.kafka.common.KafkaException:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the
> same transactionalId, or the producer's transaction has been expired by
> the broker.
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> .java:1352)
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> 1260)
> at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> .java:109)
> at org.apache.kafka.clients.NetworkClient.completeResponses(
> NetworkClient.java:572)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> at org.apache.kafka.clients.producer.internals.Sender
> .maybeSendAndPollTransactionalRequest(Sender.java:414)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> .java:312)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> 239)
> at java.lang.Thread.run(Thread.java:748)
>


Re: Getting Abstract method Error with flink 1.13.0 and 1.12.0

2021-06-01 Thread Dipanjan Mazumder
 

Hi ,
   I have integrated flink-siddhi library ([com.github.haoch/flink-siddhi_2.11 
"0.2.2-SNAPSHOT"]) and i tried to configure and implement control stream from 
flink-siddh and it broke with AbstractMethodError. When i tried running the 
same with flink 1.11.0 it worked.
More Details is given in this stack overflow link : Flink-Siddhi control event 
failing to start
Any help on this will be very great and will help me go forward:

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Flink-Siddhi control event failing to start

While trying to configure and implement flink- 
siddhi(https://clojars.org/com.github.haoch/flink-siddhi_2.11/ver...
 |

 |

 |





| 
| 
| 
|  |  |

 |

 |
| 
|  | 
[com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"]

[com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"] null
 |

 |

 |


RegardsDipanjan

  

Re: Getting Abstract method Error with flink 1.13.0 and 1.12.0

2021-06-01 Thread Dipanjan Mazumder
 Thanks Till will do so ...
On Tuesday, June 1, 2021, 06:22:32 PM GMT+5:30, Till Rohrmann 
 wrote:  
 
 Hi Dipanjan,
this type of question is best sent to Flink's user mailing list because there 
are a lot more people using Flink who could help you. The dev mailing list is 
intended to be used for development discussions.
Cheers,Till
On Tue, Jun 1, 2021 at 1:31 PM Dipanjan Mazumder  wrote:

Hi ,
   I have integrated flink-siddhi library ([com.github.haoch/flink-siddhi_2.11 
"0.2.2-SNAPSHOT"]) and i tried to configure and implement control stream from 
flink-siddh and it broke with AbstractMethodError. When i tried running the 
same with flink 1.11.0 it worked.
More Details is given in this stack overflow link : Flink-Siddhi control event 
failing to start
Any help on this will be very great and will help me go forward:

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Flink-Siddhi control event failing to start

While trying to configure and implement flink- 
siddhi(https://clojars.org/com.github.haoch/flink-siddhi_2.11/ver...
 |

 |

 |





| 
| 
| 
|  |  |

 |

 |
| 
|  | 
[com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"]

[com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"] null
 |

 |

 |


RegardsDipanjan

  

Re: Getting Abstract method Error with flink 1.13.0 and 1.12.0

2021-06-01 Thread Till Rohrmann
Hi Dipanjan,

this type of question is best sent to Flink's user mailing list because
there are a lot more people using Flink who could help you. The dev mailing
list is intended to be used for development discussions.

Cheers,
Till

On Tue, Jun 1, 2021 at 1:31 PM Dipanjan Mazumder  wrote:

> Hi ,
>
>I have integrated flink-siddhi library ([com.github.haoch/flink-siddhi_2.11
> "0.2.2-SNAPSHOT"]
> )
> and i tried to configure and implement control stream from flink-siddh and
> it broke with AbstractMethodError. When i tried running the same with flink
> 1.11.0 it worked.
>
> More Details is given in this stack overflow link : Flink-Siddhi control
> event failing to start
> 
>
> Any help on this will be very great and will help me go forward:
>
> Flink-Siddhi control event failing to start
>
> While trying to configure and implement flink-
> siddhi(https://clojars.org/com.github.haoch/flink-siddhi_2.11/ver...
>
> 
>
>
>
> [com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"]
>
> [com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"] null
>
> 
>
> Regards
> Dipanjan
>
>


回复:窗口函数使用的时间类型

2021-06-01 Thread guoyb
谢谢!明天回公司再调试。



---原始邮件---
发件人: "MOBIN"<18814118...@163.com
发送时间: 2021年6月1日(周二) 晚上7:41
收件人: "user-zh@flink.apache.org"https://help.aliyun.com/document_detail/62512.html?spm=a2c4g.11186623.6.827.49531b09XfgsU7




| |
MOBIN
|
签名由网易邮箱大师定制


在2021年06月1日 19:37,guoyb<861277...@qq.com 写道:
好的,谢谢!


我试试



---原始邮件---
发件人: "Shuo Cheng"

????

2021-06-01 Thread 5599


回复:窗口函数使用的时间类型

2021-06-01 Thread MOBIN
其次可以参考下阿里的demo:
https://help.aliyun.com/document_detail/62512.html?spm=a2c4g.11186623.6.827.49531b09XfgsU7




| |
MOBIN
|
签名由网易邮箱大师定制


在2021年06月1日 19:37,guoyb<861277...@qq.com> 写道:
好的,谢谢!


我试试



---原始邮件---
发件人: "Shuo Cheng"

回复:窗口函数使用的时间类型

2021-06-01 Thread guoyb
好的,谢谢!


我试试



---原始邮件---
发件人: "Shuo Cheng"

Re: 窗口函数使用的时间类型

2021-06-01 Thread Shuo Cheng
SQL 流作业 window 可定义在两种时间属性类型字段上:
1) event time: ddl 中需要给时间类型字段 (timestamp) 定义 watermark
2) process time: 使用 PROCTIME()

On 6/1/21, guoyb <861277...@qq.com> wrote:
> 是的。
>
>
> 大神能否指条明路解决这问题。
>
>
>
> ---原始邮件---
> 发件人: "MOBIN"<18814118...@163.com
> 发送时间: 2021年6月1日(周二) 晚上7:09
> 收件人: "user-zh@flink.apache.org" 主题: 回复:窗口函数使用的时间类型
>
>
> 是不是报的类似下面的错?
> Window aggregate can only be defined over a time attribute column, but
> TIMESTAMP(3) encountered
>
>
> | |
> MOBIN
> |
> 签名由网易邮箱大师定制
>
>
> 在2021年06月1日 19:00,guoyb<861277...@qq.com 写道:
> tumble() 开窗,需要的事件时间到底需要什么时间类型?一直报时间不对
> timestamp(3)
> datetime
> time
> 都试过了,没有一个对的。


?????? Pyflink jdbc????

2021-06-01 Thread ????
??




----
??: 
   "user-zh"

https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 
pyflink??jdbc??jar??jdbc??flink??1.13.1
 gt; 
 gt; 
 gt; 
 gt; from pyflink.datastream import StreamExecutionEnvironment
 gt; from pyflink.table import StreamTableEnvironment, 
EnvironmentSettings
 gt; env = StreamExecutionEnvironment.get_execution_environment()
 gt; table_env = StreamTableEnvironment.create( 
env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
 gt; 
table_env.get_config().get_configuration().set_string("pipeline.classpaths", 
"file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
 gt; 
 gt; 
 gt; # 2. create source Table
 gt; table_env.execute_sql("""
 gt; 
 gt; 
 gt; CREATE TABLE table_source (
 gt; amp;nbsp; e string
 gt; ) WITH (
 gt; amp;nbsp;'connector' = 'jdbc',
 gt; amp;nbsp; 'url' = 'jdbc:mysql://:3306/test',
 gt; amp;nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
 gt; amp;nbsp; 'table-name' = 'enum_test',
 gt; amp;nbsp; 'username' = 'pms_etl',
 gt; amp;nbsp; 'password' = 'pms_etl_q'
 gt; )
 gt; 
 gt; 
 gt; """)
 gt; 
 gt; 
 gt; # 3. create sink Table
 gt; table_env.execute_sql("""
 gt; amp;nbsp; amp;nbsp; CREATE TABLE print (
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; e 
string
 gt; amp;nbsp; amp;nbsp; ) WITH (
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; 
'connector' = 'print'
 gt; amp;nbsp; amp;nbsp; )
 gt; """)
 gt; amp;nbsp; amp;nbsp;
 gt; 
 gt; 
 gt; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
table_source").wait()
 gt; 
 gt; 
 gt; 
 gt; python
 gt; 
 gt; 
 gt; Traceback (most recent call last):
 gt; amp;nbsp; File "demo.py", line 41, in 

Re: Pyflink jdbc相关

2021-06-01 Thread Dian Fu
这样试试,把”\”改成”/“:

file:///D:/Pyproject/flink-connector-jdbc_2.11-1.13.1.jar



> 2021年6月1日 下午5:40,琴师 <1129656...@qq.com> 写道:
> 
> 再请叫一个问题,我在pycharm使用时候引用windows的地址不能引用,比如file:///D:\Pyproject\flink-connector-jdbc_2.11-1.13.1.jar;
>   这样不能引入,大佬有用用过吗?
> 
> 
> --原始邮件--
> 发件人:  
>   "琴师"
> 
> <1129656...@qq.com;
> 发送时间:2021年6月1日(星期二) 下午5:30
> 收件人:"user-zh" 
> 主题:回复: Pyflink jdbc相关
> 
> 
> 
> 
> 
> 感谢,我换成2.11确实可以了
> 
> 
> -- 原始邮件 --
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年6月1日(星期二) 下午5:04
> 收件人:"user-zh" 
> 主题:Re: Pyflink jdbc相关
> 
> 
> 
> Hi,
> 
> 本地执行:
> 1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的
> 
> 
> flink run:
> 1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。
> 
> 
>  2021年6月1日 下午4:33,琴师 <1129656...@qq.com 写道:
>  
>  Hi,
>  nbsp; 
> nbsp;我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 
> 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1
>  我的原代码如下:
>  
>  
>  from pyflink.datastream import StreamExecutionEnvironment
>  from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>  env = StreamExecutionEnvironment.get_execution_environment()
>  table_env = StreamTableEnvironment.create( 
> env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
>  
> table_env.get_config().get_configuration().set_string("pipeline.classpaths", 
> "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
>  
>  
>  # 2. create source Table
>  table_env.execute_sql("""
>  
>  
>  CREATE TABLE table_source (
>  nbsp; e string
>  ) WITH (
>  nbsp;'connector' = 'jdbc',
>  nbsp; 'url' = 'jdbc:mysql://:3306/test',
>  nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
>  nbsp; 'table-name' = 'enum_test',
>  nbsp; 'username' = 'pms_etl',
>  nbsp; 'password' = 'pms_etl_q'
>  )
>  
>  
>  """)
>  
>  
>  # 3. create sink Table
>  table_env.execute_sql("""
>  nbsp; nbsp; CREATE TABLE print (
>  nbsp; nbsp; nbsp; nbsp; e string
>  nbsp; nbsp; ) WITH (
>  nbsp; nbsp; nbsp; nbsp; 'connector' = 'print'
>  nbsp; nbsp; )
>  """)
>  nbsp; nbsp;
>  
>  
>  table_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
> table_source").wait()
>  
>  
>  
>  我直接用python执行时候错误返回如下
>  
>  
>  Traceback (most recent call last):
>  nbsp; File "demo.py", line 41, in   nbsp; nbsp; table_env.execute_sql("INSERT INTO table_sink 
> SELECT * FROM table_source").wait()
>  nbsp; File 
> "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", 
> line 804, in execute_sql
>  nbsp; nbsp; return TableResult(self._j_tenv.executeSql(stmt))
>  nbsp; File 
> "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in 
> __call__
>  nbsp; nbsp; answer, self.gateway_client, self.target_id, 
> self.name)
>  nbsp; File 
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 
> 146, in deco
>  nbsp; nbsp; return f(*a, **kw)
>  nbsp; File 
> "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in 
> get_return_value
>  nbsp; nbsp; format(target_id, ".", name), value)
>  py4j.protocol.Py4JJavaError: An error occurred while calling 
> o4.executeSql.
>  : org.apache.flink.table.api.ValidationException: Unable to create a 
> source for reading table 'default_catalog.default_database.table_source'.
>  
>  
>  Table options are:
>  
>  
>  'connector'='jdbc'
>  'driver'='com.mysql.cj.jdbc.Driver'
>  'password'='pms_etl_q'
>  'table-name'='enum_test'
>  'url'='jdbc:mysql://***:3306/test'
>  'username'='pms_etl'
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>  nbsp; nbsp; nbsp; nbsp; at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>  nbsp; nbsp; nbsp; nbsp; at 
> 

回复:窗口函数使用的时间类型

2021-06-01 Thread guoyb
是的。


大神能否指条明路解决这问题。



---原始邮件---
发件人: "MOBIN"<18814118...@163.com
发送时间: 2021年6月1日(周二) 晚上7:09
收件人: "user-zh@flink.apache.org"

回复:窗口函数使用的时间类型

2021-06-01 Thread MOBIN
是不是报的类似下面的错?
Window aggregate can only be defined over a time attribute column, but 
TIMESTAMP(3) encountered


| |
MOBIN
|
签名由网易邮箱大师定制


在2021年06月1日 19:00,guoyb<861277...@qq.com> 写道:
tumble() 开窗,需要的事件时间到底需要什么时间类型?一直报时间不对
timestamp(3)
datetime
time
都试过了,没有一个对的。

窗口函数使用的时间类型

2021-06-01 Thread guoyb
tumble() 开窗,需要的事件时间到底需要什么时间类型?一直报时间不对
timestamp(3)
datetime
time
都试过了,没有一个对的。

Re: How to use or configure flink checkpointing with siddhi internal state

2021-06-01 Thread Dipanjan Mazumder
 Hi Till,
    Thanks so that means it should work will try and see ..
RegardsDipanjan
On Tuesday, June 1, 2021, 01:48:19 PM GMT+5:30, Till Rohrmann 
 wrote:  
 
 Hi Dipanjan,
I am assuming that you are using the flink-siddhi library [1]. I am not an 
expert but it looks as if the AbstractSiddhiOperator overrides the 
snapshotState [2] method to store the Siddhi state in Flink.
[1] https://github.com/haoch/flink-siddhi[2] 
https://github.com/haoch/flink-siddhi/blob/master/core/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java#L331
Cheers,Till
On Mon, May 31, 2021 at 7:27 PM Dipanjan Mazumder  
wrote:

Hi ,
   I was trying to do checkpointing while using siddhi as the CEP engine 
running on flink. While using siddhi windowing , it uses an internal state to 
aggregated or perform operation on a bucket of events pertaining to a specific 
time window. But what i am not sure is how can that state be mapped to Flinks 
internal state so that i can use flink checkpointing to safeguard the internal 
state of the siddhi operators in the face of failure.
Any help or pointer for this will be of great help to me.Thanks in advance.
Dipanjan -
  

Re: kafka exactly-once语义下,从svaepoint恢复报错

2021-06-01 Thread r pp
 'properties.transaction.timeout.ms' = '3'  配置的太短了,30s
transactionalId   就过期了。 估计 都来不去启动吧
 官网的原文
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
that were started before taking a checkpoint, after recovering from the
said checkpoint. If the time between Flink application crash and completed
restart is larger than Kafka’s transaction timeout there will be data loss
(Kafka will automatically abort transactions that exceeded timeout time).
Having this in mind, please configure your transaction timeout
appropriately to your expected down times.

周瑞  于2021年6月1日周二 下午3:45写道:

>
> 您好:kafka在exactly-once语义下,从svaepoint恢复报错。初步排查认为是kafka事务使用了旧的epoch。请问这个问题怎么处理?
> //todo 通过配置传进来
> env.setParallelism(1);
> env.enableCheckpointing(60L, CheckpointingMode.EXACTLY_ONCE);
>
> // checkpoint的清除策略(即使任务被显示地取消也会保留checkpoint)
> env.getCheckpointConfig()
>
> .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
>
> //TODO 生产中必须使用 HDFS
> env.setStateBackend(new FsStateBackend("hdfs://
> 10.10.98.226:8020/tmp/checkpoint66"));
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> public static final  String TABLE_NAME = "KafkaTable";
> public static final  String COLUMN_NAME = "source_value";
>
> public static final String KAFKA_TABLE_FORMAT =
> "CREATE TABLE "+TABLE_NAME+" (\n" +
> "  "+COLUMN_NAME+" STRING\n" +
> ") WITH (\n" +
> "   'connector' = 'kafka',\n" +
> "   'topic' = '%s',\n" +
> "   'properties.bootstrap.servers' = '%s',\n" +
> "   'sink.semantic' = 'exactly-once',\n" +
> "   'properties.transaction.timeout.ms' = '3',\n" +
> "   'format' = 'dbz-json'\n" +
> ")\n";
> org.apache.kafka.common.errors.ProducerFencedException: Producer attempted
> an operation with an old epoch. Either there is a newer producer with the
> same transactionalId,
>  or the producer's transaction has been expired by the broker. while
> recovering transaction KafkaTransactionState [transactionalId=Source:
> TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) - Sink:
> Sink(table=[default_catalog.default_database.KafkaTable],
> fields=[data])-7df19f87deec5680128845fd9a6ca18d-0, producerId=239009,
> epoch=216]. Presumably this transaction has been already committed before



-- 
Best,
  pp


Re: Flink是否支持自定义的限流功能

2021-06-01 Thread r pp
flink 的反压机制 不就是在限流么?

suisuimu <726400...@qq.com> 于2021年6月1日周二 下午5:37写道:

> Flink从Kafka读取数据时,是否支持用户自定义的限流策略。
> 例如根据消息中的某个字段的名称,设置流控规则。
> 请问是否支持呢?还是需要自己借助第三方组件(例如sentinel)来实现?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best,
  pp


?????? Pyflink jdbc????

2021-06-01 Thread ????
pycharmwindowsfile:///D:\Pyproject\flink-connector-jdbc_2.11-1.13.1.jar;
  ??


----
??: 
   ""   
 <1129656...@qq.com;
:2021??6??1??(??) 5:30
??:"user-zh"https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 
pyflink??jdbc??jar??jdbc??flink??1.13.1
 
 
 
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import StreamTableEnvironment, EnvironmentSettings
 env = StreamExecutionEnvironment.get_execution_environment()
 table_env = StreamTableEnvironment.create( 
env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
 
table_env.get_config().get_configuration().set_string("pipeline.classpaths", 
"file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
 
 
 # 2. create source Table
 table_env.execute_sql("""
 
 
 CREATE TABLE table_source (
 nbsp; e string
 ) WITH (
 nbsp;'connector' = 'jdbc',
 nbsp; 'url' = 'jdbc:mysql://:3306/test',
 nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
 nbsp; 'table-name' = 'enum_test',
 nbsp; 'username' = 'pms_etl',
 nbsp; 'password' = 'pms_etl_q'
 )
 
 
 """)
 
 
 # 3. create sink Table
 table_env.execute_sql("""
 nbsp; nbsp; CREATE TABLE print (
 nbsp; nbsp; nbsp; nbsp; e string
 nbsp; nbsp; ) WITH (
 nbsp; nbsp; nbsp; nbsp; 'connector' = 'print'
 nbsp; nbsp; )
 """)
 nbsp; nbsp;
 
 
 table_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
table_source").wait()
 
 
 
 python
 
 
 Traceback (most recent call last):
 nbsp; File "demo.py", line 41, in 

Re: 不同的程序在同一时间段报同一个异常

2021-06-01 Thread r pp
你的网络环境是怎么样? 是在docker 上跑么?还是怎么?
从报错上看,netty 无法解码导致的,但是为什么会出现这样的现象?
或许 你可以把问题贴的在详细一点

5599 <673313...@qq.com> 于2021年6月1日周二 下午2:32写道:

> 退订
>
>
>
>
> --原始邮件--
> 发件人: "r pp" 发送时间: 2021年6月1日(星期二) 下午2:07
> 收件人: "user-zh" 主题: Re: 不同的程序在同一时间段报同一个异常
>
>
>
> 你的程序有挂掉么?
>
> mq sun 
>  大家好:
>  最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
>  ERROR org.apache.flink.runtime.blob.BlobServerConnection -Error
> while
>  excuting Blob connection
>  .
>  .
>  .
> 
> 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
>  :Adjusted frame length exceeds 10485760: 1347375960 -discarded
> 
> 
> 
> 上面的报错信息,刚开始知道其中一个报错的时候,怀疑是状态过大导致的。但是后来又有另外一个程序报同样的错误,现在不确定是什么问题导致,请问大佬这个可能是什么原因
> 
>
>
> --
> Best,
>  pp



-- 
Best,
  pp


Flink是否支持自定义的限流功能

2021-06-01 Thread suisuimu
Flink从Kafka读取数据时,是否支持用户自定义的限流策略。
例如根据消息中的某个字段的名称,设置流控规则。
请问是否支持呢?还是需要自己借助第三方组件(例如sentinel)来实现?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

自定义带有状态的udf

2021-06-01 Thread 阿华田
自定义UDF 实现CheckpointedFunction 
伪代码如下 发现并没有执行initializeState






public class ClusterInfoCollectUdf   extends ScalarFunction implements 
CheckpointedFunction {
private static final LoggerLOGGER = 
LoggerFactory.getLogger(ClusterInfoCollectUdf.class);
private transient MapState  mapState;
private   MapStateDescriptor mapStateDescriptor;
   。



@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {


LOGGER.info("the snapshotStateis  started ");


}

@Override
public void initializeState(FunctionInitializationContext context) throws 
Exception {
mapStateDescriptor = new MapStateDescriptor<>(
"app-status-map",
String.class,
Integer.class);

mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor);
LOGGER.info("the initializeStateis  started ");




}





| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



?????? Pyflink jdbc????

2021-06-01 Thread ????
2.11??


----
??: 
   "user-zh"

https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 
pyflink??jdbc??jar??jdbc??flink??1.13.1
 
 
 
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import StreamTableEnvironment, EnvironmentSettings
 env = StreamExecutionEnvironment.get_execution_environment()
 table_env = StreamTableEnvironment.create( 
env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
 
table_env.get_config().get_configuration().set_string("pipeline.classpaths", 
"file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
 
 
 # 2. create source Table
 table_env.execute_sql("""
 
 
 CREATE TABLE table_source (
 nbsp; e string
 ) WITH (
 nbsp;'connector' = 'jdbc',
 nbsp; 'url' = 'jdbc:mysql://:3306/test',
 nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
 nbsp; 'table-name' = 'enum_test',
 nbsp; 'username' = 'pms_etl',
 nbsp; 'password' = 'pms_etl_q'
 )
 
 
 """)
 
 
 # 3. create sink Table
 table_env.execute_sql("""
 nbsp; nbsp; CREATE TABLE print (
 nbsp; nbsp; nbsp; nbsp; e string
 nbsp; nbsp; ) WITH (
 nbsp; nbsp; nbsp; nbsp; 'connector' = 'print'
 nbsp; nbsp; )
 """)
 nbsp; nbsp;
 
 
 table_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
table_source").wait()
 
 
 
 python
 
 
 Traceback (most recent call last):
 nbsp; File "demo.py", line 41, in 

如何自定义带有状态的UDF

2021-06-01 Thread 阿华田
自定义UDF 实现CheckpointedFunction 
伪代码如下 发现并没有执行initializeState






public class ClusterInfoCollectUdf   extends ScalarFunction implements 
CheckpointedFunction {
private static final LoggerLOGGER = 
LoggerFactory.getLogger(ClusterInfoCollectUdf.class);
private transient MapState  mapState;
private   MapStateDescriptor mapStateDescriptor;
   。



@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {


LOGGER.info("the snapshotStateis  started ");


}

@Override
public void initializeState(FunctionInitializationContext context) throws 
Exception {
mapStateDescriptor = new MapStateDescriptor<>(
"app-status-map",
String.class,
Integer.class);

mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor);
LOGGER.info("the initializeStateis  started ");




}



| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Re: Pyflink jdbc相关

2021-06-01 Thread Dian Fu
Hi,

本地执行:
1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的


flink run:
1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。


> 2021年6月1日 下午4:33,琴师 <1129656...@qq.com> 写道:
> 
> Hi,
>  我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 
> 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1
> 我的原代码如下:
> 
> 
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
> env = StreamExecutionEnvironment.get_execution_environment()
> table_env = StreamTableEnvironment.create( 
> env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
> table_env.get_config().get_configuration().set_string("pipeline.classpaths", 
> "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
> 
> 
> # 2. create source Table
> table_env.execute_sql("""
> 
> 
> CREATE TABLE table_source (
>  e string
> ) WITH (
> 'connector' = 'jdbc',
>  'url' = 'jdbc:mysql://:3306/test',
>  'driver' = 'com.mysql.cj.jdbc.Driver',
>  'table-name' = 'enum_test',
>  'username' = 'pms_etl',
>  'password' = 'pms_etl_q'
> )
> 
> 
> """)
> 
> 
> # 3. create sink Table
> table_env.execute_sql("""
>   CREATE TABLE print (
> e string
>   ) WITH (
> 'connector' = 'print'
>   )
> """)
>  
> 
> 
> table_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
> table_source").wait()
> 
> 
> 
> 我直接用python执行时候错误返回如下
> 
> 
> Traceback (most recent call last):
>  File "demo.py", line 41, intable_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
> table_source").wait()
>  File 
> "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", 
> line 804, in execute_sql
>   return TableResult(self._j_tenv.executeSql(stmt))
>  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", 
> line 1286, in __call__
>   answer, self.gateway_client, self.target_id, self.name)
>  File 
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 
> 146, in deco
>   return f(*a, **kw)
>  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 
> 328, in get_return_value
>   format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
> : org.apache.flink.table.api.ValidationException: Unable to create a source 
> for reading table 'default_catalog.default_database.table_source'.
> 
> 
> Table options are:
> 
> 
> 'connector'='jdbc'
> 'driver'='com.mysql.cj.jdbc.Driver'
> 'password'='pms_etl_q'
> 'table-name'='enum_test'
> 'url'='jdbc:mysql://***:3306/test'
> 'username'='pms_etl'
> at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
> at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
> at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268)
> at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at 
> 

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-01 Thread Yun Tang
Hi,

增量checkpoint上传的是sst文件本身,里面可能有一部分空间是被无用数据占据的,你可以理解成增量checkpoint上传的是受到空间放大影响的RocksDB的数据,如果因为单机的数据量较小,没有及时触发compaction的话,确实存在整个远程checkpoint目录数据大于当前实际空间的情况。而关闭增量checkpoint,上传的其实是与savepoint格式一样的kv数据对,Flink会遍历整个DB,将目前有效的数据写出到远程。所以你关闭增量checkpoint,而发现checkpoint目录保持恒定大小的话,说明真实有效数据的空间是稳定的。

另外,其实不建议在日常生产中关闭增量checkpoint,主要原因是对于大规模作业来说,全量checkpoint一方面会对底层DFS来说每次需要上传的数据量变大,另一方面,也会增长单次checkpoint的
 e2e duration,有checkpoint超时失败的风险。

祝好
唐云

From: HunterXHunter <1356469...@qq.com>
Sent: Tuesday, June 1, 2021 11:44
To: user-zh@flink.apache.org 
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Pyflink jdbc????

2021-06-01 Thread ????
Hi??
 ??https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 
pyflink??jdbc??jar??jdbc??flink??1.13.1



from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create( 
env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
table_env.get_config().get_configuration().set_string("pipeline.classpaths", 
"file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")


# 2. create source Table
table_env.execute_sql("""


CREATE TABLE table_source (
 e string
) WITH (
'connector' = 'jdbc',
 'url' = 'jdbc:mysql://:3306/test',
 'driver' = 'com.mysql.cj.jdbc.Driver',
 'table-name' = 'enum_test',
 'username' = 'pms_etl',
 'password' = 'pms_etl_q'
)


""")


# 3. create sink Table
table_env.execute_sql("""
  CREATE TABLE print (
e string
  ) WITH (
'connector' = 'print'
  )
""")
 


table_env.execute_sql("INSERT INTO table_sink SELECT * FROM 
table_source").wait()



python


Traceback (most recent call last):
 File "demo.py", line 41, in 

flink sql1.13.1????change log??????join????????????????????????????

2021-06-01 Thread ??????
|insert into dwd_order_detail
|select
|   ord.Id,
|   ord.Code,
|   Status
| concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
| TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
|from
|orders ord
|left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
oed.CreateTimeCAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
|where ( ord.OrderTimeCAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
TIMESTAMP)
|or ord.ReviewTimeCAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
TIMESTAMP)
|or ord.RejectTimeCAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
TIMESTAMP)
|) and 
ord.IsDeleted=0;??kafka??canal-json??join??upsert-kafka1.12??1.12??1.13??XJ0120210531004794??canal-json+U|
 +U | XJ0120210531004704 |  50 | | +U | 
XJ0120210531004788 |  50 | | +U | XJ0120210531004819 |  
50 | | +U | XJ0120210531004667 |  50 | | +U |   
  XJ0120210531004695 |  50 | | +U | XJ0120210531004776 |
  50 | | +U | XJ0120210531004784 |  50 | | +U | 
XJ0120210531004861 |  50 | | +U | XJ0120210531004794 |  
50 | | +U | XJ0120210531004672 |  50 | | +U |   
  XJ0120210531004766 |  50 | | +U | XJ0120210531004806 
|  50 | | +U | XJ0120210531004812 |  50 | | +U |
 XJ0120210601000126 |  50 | | +U | 
XJ0120210601000179 |  50 | | +U | XJ0120210531004816 |  
50 | | +U | XJ0120210601000927 |  50 
|??joinupsert kakfa??upsert kafka| +I | 
XJ0120210531004794 |  50 | | -U | XJ0120210531004794 |  
50 
|orders??order_extend-U

Re: How to use or configure flink checkpointing with siddhi internal state

2021-06-01 Thread Till Rohrmann
Hi Dipanjan,

I am assuming that you are using the flink-siddhi library [1]. I am not an
expert but it looks as if the AbstractSiddhiOperator overrides the
snapshotState [2] method to store the Siddhi state in Flink.

[1] https://github.com/haoch/flink-siddhi
[2]
https://github.com/haoch/flink-siddhi/blob/master/core/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java#L331

Cheers,
Till

On Mon, May 31, 2021 at 7:27 PM Dipanjan Mazumder 
wrote:

> Hi ,
>I was trying to do checkpointing while using siddhi as the CEP engine
> running on flink. While using siddhi windowing , it uses an internal state
> to aggregated or perform operation on a bucket of events pertaining to a
> specific time window. But what i am not sure is how can that state be
> mapped to Flinks internal state so that i can use flink checkpointing to
> safeguard the internal state of the siddhi operators in the face of failure.
> Any help or pointer for this will be of great help to me.Thanks in advance.
> Dipanjan -


????

2021-06-01 Thread on the way


kafka exactly-once语义下,从svaepoint恢复报错

2021-06-01 Thread 周瑞
您好:kafka在exactly-once语义下,从svaepoint恢复报错。初步排查认为是kafka事务使用了旧的epoch。请问这个问题怎么处理?
//todo 通过配置传进来
env.setParallelism(1);
env.enableCheckpointing(60L, CheckpointingMode.EXACTLY_ONCE);

// checkpoint的清除策略(即使任务被显示地取消也会保留checkpoint)
env.getCheckpointConfig()

.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);

//TODO 生产中必须使用 HDFS
env.setStateBackend(new 
FsStateBackend("hdfs://10.10.98.226:8020/tmp/checkpoint66"));

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
public static final  String TABLE_NAME = "KafkaTable";
public static final  String COLUMN_NAME = "source_value";

public static final String KAFKA_TABLE_FORMAT =
"CREATE TABLE "+TABLE_NAME+" (\n" +
"  "+COLUMN_NAME+" STRING\n" +
") WITH (\n" +
"   'connector' = 'kafka',\n" +
"   'topic' = '%s',\n" +
"   'properties.bootstrap.servers' = '%s',\n" +
"   'sink.semantic' = 'exactly-once',\n" +
"   'properties.transaction.timeout.ms' = '3',\n" +
"   'format' = 'dbz-json'\n" +
")\n";
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an 
operation with an old epoch. Either there is a newer producer with the same 
transactionalId,
 or the producer's transaction has been expired by the broker. while recovering 
transaction KafkaTransactionState [transactionalId=Source: 
TableSourceScan(table=[[default_catalog, default_database, debezium_source]], 
fields=[data]) - Sink: 
Sink(table=[default_catalog.default_database.KafkaTable], 
fields=[data])-7df19f87deec5680128845fd9a6ca18d-0, producerId=239009, 
epoch=216]. Presumably this transaction has been already committed before

?????? ??????????????????????????????????

2021-06-01 Thread 5599





----
??: "r pp"

Re: 不同的程序在同一时间段报同一个异常

2021-06-01 Thread mq sun
有些程序挂掉,有些没有

r pp  于2021年6月1日周二 下午2:07写道:

> 你的程序有挂掉么?
>
> mq sun  于2021年5月31日周一 下午7:23写道:
>
> > 大家好:
> >   最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
> > ERROR org.apache.flink.runtime.blob.BlobServerConnection  -Error while
> > excuting Blob connection
> > .
> > .
> > .
> >
> >
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
> > :Adjusted frame length exceeds 10485760: 1347375960 -discarded
> >
> >
> >
> 上面的报错信息,刚开始知道其中一个报错的时候,怀疑是状态过大导致的。但是后来又有另外一个程序报同样的错误,现在不确定是什么问题导致,请问大佬这个可能是什么原因
> >
>
>
> --
> Best,
>   pp
>


Re: 不同的程序在同一时间段报同一个异常

2021-06-01 Thread r pp
你的程序有挂掉么?

mq sun  于2021年5月31日周一 下午7:23写道:

> 大家好:
>   最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
> ERROR org.apache.flink.runtime.blob.BlobServerConnection  -Error while
> excuting Blob connection
> .
> .
> .
>
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
> :Adjusted frame length exceeds 10485760: 1347375960 -discarded
>
>
> 上面的报错信息,刚开始知道其中一个报错的时候,怀疑是状态过大导致的。但是后来又有另外一个程序报同样的错误,现在不确定是什么问题导致,请问大佬这个可能是什么原因
>


-- 
Best,
  pp