Re: Apache Flink - A question about Tables API and SQL interfaces

2021-05-13 Thread 张静
Hi Mans,
 +1 for Austin's reply.
 I would like to add something about "allow lateness".
 After introduce Windowing table-valued function in Flink 1.13,
User could use two SQL solution to do window aggregate. And 'allow
lateness' behavior is different in these two solutions.
1. If adopt windowing tvf window aggregate [2], 'allow lateness'
is not supported yet.
2. If adopt legacy Grouped Window Functions [1], 'allow lateness'
is supported. However, you should use the feature with caution since
it depends on state retention configuration (`table.exec.state.ttl`
[3]), especially if a job contains many operator except for window
aggregate. Please see JIRA-21301 [4]. It maybe could be resolved in
Flink-1.14.

Best,
beyond1920

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/#group-window-aggregation-deprecated
[2]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-tvf/
[3]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/
[4]:https://issues.apache.org/jira/browse/FLINK-21301

Austin Cawley-Edwards  于2021年5月13日周四 下午9:57写道:
>
> Hi Mans,
>
> There are currently no public APIs for doing so, though if you're willing to 
> deal with some breaking changes there are some experimental config options 
> for late events in the Table API and SQL, seen in the WIndowEmitStrategy 
> class[1].
>
> Best,
> Austin
>
> [1]: 
> https://github.com/apache/flink/blob/607919c6ea6983ae5ad3f82d63b7d6455c73d225/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala#L173-L211
>
> On Wed, May 12, 2021 at 5:12 PM M Singh  wrote:
>>
>> Thanks Austin for your helpful references.
>>
>> I did take a look at [2]/[3] - but did not find anything relevant on 
>> searching for string 'late' (for allowed lateness etc) or side output.  So 
>> from my understanding the late events will be dropped if I am using Table 
>> API or SQL and the only option is to use datastream interface.  Please let 
>> me know if I missed anything.
>>
>> Thanks again.
>>
>>
>> On Wednesday, May 12, 2021, 04:36:06 PM EDT, Austin Cawley-Edwards 
>>  wrote:
>>
>>
>> Hi Mans,
>>
>> I don't believe there are explicit triggers/evictors/timers in the Table 
>> API/ SQL, as that is abstracted away from the lower-level DataStream API. If 
>> you need to get into the fine-grained details, Flink 1.13 has made some good 
>> improvements in going from the Table API to the DataStream API, and back 
>> again. [1]
>>
>> For working with time and lateness with Table API and SQL, some good places 
>> to look are the GroupBy Window Aggregation section of the Table API docs[2], 
>> as well as the SQL cookbook[3] and Ververica's SQL training wiki[4].
>>
>> Hope that helps,
>> Austin
>>
>> [1]: 
>> https://flink.apache.org/news/2021/05/03/release-1.13.0.html#improved-interoperability-between-datastream-api-and-table-apisql
>> [2]: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#groupby-window-aggregation
>> [3]: 
>> https://github.com/ververica/flink-sql-cookbook#aggregations-and-analytics
>> [4]: https://github.com/ververica/sql-training/wiki/Queries-and-Time
>>
>> On Wed, May 12, 2021 at 1:30 PM M Singh  wrote:
>>
>> Hey Folks:
>>
>> I have the following questions regarding Table API/SQL in streaming mode:
>>
>> 1. Is there is a notion triggers/evictors/timers when using Table API or SQL 
>> interfaces ?
>> 2. Is there anything like side outputs and ability to define allowed 
>> lateness when dealing with the Table API or SQL interfaces ?
>>
>> If there are any alternate ways for the above when using Table API or SQL, 
>> please let me know where I can find the relevant documentation/examples.
>>
>> Thanks for your help.
>>
>> Mans
>>
>>
>>
>>


UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

2021-05-13 Thread automths
Hi:

我自定义了一个UpsertStreamTableSink,在接入cdc数据向我的这个Sink写数据的时候,抛出了下面的异常:
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)


我在构建tableSchema的时候,已经设置了primary key了,但依旧抛出这个错误。
我的flink版本是flink-1.12.0的。


请教一下,这个问题,该怎么解决?




祝好!
automths





How to deal with the abnormal extraction time of the calculated column?

2021-05-13 Thread forideal
Hi My Friends:


How to deal with the abnormal extraction time of the calculated column ?
My DDL :
CREATE TABLE mytable (
  xx string,
  t as get_event_time_from_xx(xx),
  WATERMARK FOR t AS t - INTERVAL '1' SECOND
   ) WITH (xxx)
   If an exception occurs inside my function and I don’t want to generate a 
default time or an error time, how should I deal with it?
   Looking forward to your answer, thank you very much.


Best,
forideal

Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint

2021-05-13 Thread 陳昌倬
Hi,

We run our application in Flink 1.13.0, Kubernetes standalone
application cluster with reactive mode enabled. The application has
stopped and cannot restore today, so we try to restore the application
from checkpoint. However, the application cannot restart from checkpoint
due to the following error. We have no idea the meaning of this
exception, so any help is welcome.


2021-05-14 01:55:37,204 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Close ResourceManager connection 
06d772aae2ab4afb8c6917dac40cd727: Stopping JobMaster for job 
rt-match_11.2.16_5d671ba3()..
2021-05-14 01:55:37,205 INFO  
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Stopping DefaultLeaderRetrievalService.
2021-05-14 01:55:37,205 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] 
- Stopping 
KubernetesLeaderRetrievalDriver{configMapName='rt-match-flink-dev-resourcemanager-leader'}.
2021-05-14 01:55:37,205 INFO  
org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] 
- The watcher is closing.
2021-05-14 01:55:37,215 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Stopping DefaultLeaderElectionService.
2021-05-14 01:55:37,215 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver [] 
- Closing 
KubernetesLeaderElectionDriver{configMapName='rt-match-flink-dev--jobmanager-leader'}.
2021-05-14 01:55:37,216 INFO  
org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] 
- The watcher is closing.
2021-05-14 01:55:37,342 INFO  
org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed job 
graph  from 
KubernetesStateHandleStore{configMapName='rt-match-flink-dev-dispatcher-leader'}.
2021-05-14 01:55:37,914 INFO  
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
[] - Application FAILED:
java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: 
Application Status: FAILED
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$4(ApplicationDispatcherBootstrap.java:304)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680) 
~[?:?]
at 
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094) 
~[?:?]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.unwrapJobResultException(ApplicationDispatcherBootstrap.java:297)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$getApplicationResult$3(ApplicationDispatcherBootstrap.java:270)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) 
~[?:?]
at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
~[?:?]
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) 
~[?:?]
at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.getApplicationResult(ApplicationDispatcherBootstrap.java:272)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:221)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
~[?:?]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:254)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at 

Re: 回复:flink使用hive作为维表,kafka作为数据源,join时候报错

2021-05-13 Thread hehuiyuan
写on了,刚发现描述的问题没有写全



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:flink使用hive作为维表,kafka作为数据源,join时候报错

2021-05-13 Thread hehuiyuan
sorry , 描述没写全,是有on的



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink ????ceph

2021-05-13 Thread ????????????????
Hi ALL??


??flink  ceph  ceph 




Best,


Luke Yan

Re: Root Exception can not be shown on Web UI in Flink 1.13.0

2021-05-13 Thread Guowei Ma
Hi, Gary

I think it might be a bug. So would you like to open a jira for this.
And could you share the exception ,which the TaskManagerLocation is null?
It might be very helpful to verify the cause.

Best,
Guowei


On Thu, May 13, 2021 at 10:36 AM Yangze Guo  wrote:

> Hi, it seems to be related to FLINK-22276. Thus, I'd involve Matthias
> to take a look.
>
> @Matthias My gut feeling is that not all execution who has failureInfo
> has been deployed?
>
> Best,
> Yangze Guo
>
> On Wed, May 12, 2021 at 10:12 PM Gary Wu  wrote:
> >
> > Hi,
> >
> > We have upgraded our Flink applications to 1.13.0 but we found that Root
> Exception can not be shown on Web UI with an internal server error message.
> After opening browser development console and trace the message, we found
> that there is a exception in jobmanager:
> >
> > 2021-05-12 13:30:45,589 ERROR
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] -
> Unhandled exception.
> > java.lang.IllegalArgumentException: The location must not be null for a
> non-global failure.
> > at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.assertLocalExceptionInfo(JobExceptionsHandler.java:218)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createRootExceptionInfo(JobExceptionsHandler.java:191)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> ~[?:?]
> > at java.util.stream.SliceOps$1$1.accept(SliceOps.java:199) ~[?:?]
> > at
> java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1632)
> ~[?:?]
> > at
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
> ~[?:?]
> > at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
> ~[?:?]
> > at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) ~[?:?]
> > at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> ~[?:?]
> > at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> ~[?:?]
> > at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
> > at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> ~[?:?]
> > at
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionHistory(JobExceptionsHandler.java:169)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionsInfo(JobExceptionsHandler.java:154)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:101)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:63)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> [?:?]
> > at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> [?:?]
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> [?:?]
> > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
> > at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> [?:?]
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
> > at java.lang.Thread.run(Thread.java:834) [?:?]
> >
> > We would like to check Is there any configuration change should be done
> for the application? Thanks!
> >
> > Regards,
> > -Gary
> >
> >
> >
> > APPIER EMAIL NOTICE
> >
> > The contents of this email message and any attachments from Appier Group
> Inc. and/or its affiliates may be privileged and confidential. If you are
> not the intended recipient of this email, please note that any disclosure,
> copying, distribution, or use of this message or its attachments is
> prohibited. If you have received this email in error, please contact us
> immediately and delete this message and any attachments.
>


Flink etl 的应用场景

2021-05-13 Thread 孙啸龙
大家好:
   方向:ETL
   除了延迟上的区别,离线能实现的,flink 实时实现不了的应用场景有哪些或者有缺陷的点?

回复:flink使用hive作为维表,kafka作为数据源,join时候报错

2021-05-13 Thread allanqinjy
你看异常信息,提示时态表join的时候需要主键,但是你没有定义。而且你join的时候不需要on吗?


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年05月14日 09:32,hehuiyuan 写道:
select 
FROM  jdqTableSources AS a
JOIN tmmmp FOR SYSTEM_TIME AS OF a.proctime AS b



Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: Temporal Table Join requires primary key in
versioned table, but no primary key can be found. The physical plan is:
FlinkLogicalJoin(condition=[AND(=($0, $4),
__INITIAL_TEMPORAL_JOIN_CONDITION($3, __TEMPORAL_JOIN_LEFT_KEY($0),
__TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner])
FlinkLogicalCalc(select=[opt, src, cur, PROCTIME() AS proctime])
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database,
jdqTableSources]], fields=[mid, db, sch, tab, opt, ts, ddl, err, src, cur,
cus])
FlinkLogicalSnapshot(period=[$cor0.proctime])
FlinkLogicalCalc(select=[item_sku_id, premium, cate_lev, type, borc])
FlinkLogicalTableSourceScan(table=[[myhive, dev,
dev_brokenscreen_insurance_sku_info]], fields=[item_sku_id, item_sku_name,
premium, cate_lev, type, borc, plan_code, subjection_b, product_name,
lev_low_price, lev_upp_price, jd_price, shelves_tm, item_first_cate_name,
item_second_cate_name, item_third_cate_name, sure_cate_lev, flag])



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink使用hive作为维表,kafka作为数据源,join时候报错

2021-05-13 Thread hehuiyuan
select 
FROM  jdqTableSources AS a
JOIN tmmmp FOR SYSTEM_TIME AS OF a.proctime AS b



Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: Temporal Table Join requires primary key in
versioned table, but no primary key can be found. The physical plan is:
FlinkLogicalJoin(condition=[AND(=($0, $4),
__INITIAL_TEMPORAL_JOIN_CONDITION($3, __TEMPORAL_JOIN_LEFT_KEY($0),
__TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner])
  FlinkLogicalCalc(select=[opt, src, cur, PROCTIME() AS proctime])
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database,
jdqTableSources]], fields=[mid, db, sch, tab, opt, ts, ddl, err, src, cur,
cus])
  FlinkLogicalSnapshot(period=[$cor0.proctime])
FlinkLogicalCalc(select=[item_sku_id, premium, cate_lev, type, borc])
  FlinkLogicalTableSourceScan(table=[[myhive, dev,
dev_brokenscreen_insurance_sku_info]], fields=[item_sku_id, item_sku_name,
premium, cate_lev, type, borc, plan_code, subjection_b, product_name,
lev_low_price, lev_upp_price, jd_price, shelves_tm, item_first_cate_name,
item_second_cate_name, item_third_cate_name, sure_cate_lev, flag])



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Mysql cdc 事件时间

2021-05-13 Thread 流弊
你好,想问下mysql cdc做维度表left join的时候,能使用处理时间吗,我测试貌似只能使用事件时间



发自我的iPhone

Re: RabbitMQ source does not stop unless message arrives in queue

2021-05-13 Thread John Morrow
Hi Jose, hey Austin!!

I know we were just recently looking at trying to consume a fixed number of 
messages from an RMQ source, process them and output them to an RMQ sink. As a 
naive first attempt at stopping the job when the target number of messaged had 
been processed, we put a counter state in the process function and tried 
throwing an exception when the counter >= the target message count.

The job had:

  *   parallelism: 1
  *   checkpointing: 1000 (1 sec)
  *   restartStrategy: noRestart
  *   prefetchCount: 100

Running it with 150 messages in the input queue and 150 also as the target 
number, at the end the queues had:

  *   output queue - 150
  *   input queue - 50

So it looks like it did transfer all the messages, but some unack'd ones also 
got requeued back at the source so end up as duplicates. I know throwing an 
exception in the Flink job is not the same as triggering a stateful shutdown, 
but it might be hitting similar unack issues.

John


From: Austin Cawley-Edwards 
Sent: Thursday 13 May 2021 16:49
To: Jose Vargas ; John Morrow 

Cc: user 
Subject: Re: RabbitMQ source does not stop unless message arrives in queue

Hey Jose,

Thanks for bringing this up – it indeed sounds like a bug. There is ongoing 
work to update the RMQ source to the new interface, which might address some of 
these issues (or should, if it is not already), tracked in FLINK-20628[1]. 
Would you be able to create a JIRA issue for this, or would you like me to?

At my previous company, we only consumed one Rabbit queue per application, so 
we didn't run into this exactly but did see other weird behavior in the RMQ 
source that could be related. I'm going to cc @John 
Morrow who might be able to contribute to 
what he's seen working with the source, if he's around. I remember some 
messages not properly being ack'ed during a stateful shutdown via the Ververica 
Platform's stop-with-savepoint functionality that you mention, though that 
might be more related to FLINK-20244[2], perhaps.


Best,
Austin

[1]: https://issues.apache.org/jira/browse/FLINK-20628
[2]: https://issues.apache.org/jira/browse/FLINK-20244

On Thu, May 13, 2021 at 10:23 AM Jose Vargas 
mailto:jose.var...@fiscalnote.com>> wrote:
Hi,

I am using Flink 1.12 to read from and write to a RabbitMQ cluster. Flink's 
RabbitMQ source has some surprising behavior when a stop-with-savepoint request 
is made.

Expected Behavior:
The stop-with-savepoint request stops the job with a FINISHED state.

Actual Behavior:
The stop-with-savepoint request either times out or hangs indefinitely unless a 
message arrives in all the queues that the job consumes from after the 
stop-with-savepoint request is made.


I know that one possible workaround is to send a sentinel value to each of the 
queues consumed by the job that the deserialization schema checks in its 
isEndOfStream method. However, this is somewhat cumbersome and complicates the 
continuous delivery of a Flink job. For example, Ververica Platform will 
trigger a stop-with-savepoint for the user if one of many possible Flink 
configurations for a job are changed. The stop-with-savepoint can then hang 
indefinitely because only some of the RabbitMQ sources will have reached a 
FINISHED state.

I have attached the TaskManager thread dump after the save-with-savepoint 
request was made. Most every thread is either sleeping or waiting around for 
locks to be released, and then there are a handful of threads trying to read 
data from a socket via the com.rabbitmq.client.impl.Frame.readFrom method.

Ideally, once a stop-with-savepoint request is made, the threads trying to read 
data from RabbitMQ would be interrupted so that all RabbitMQ sources would 
reach a FINISHED state.

Regular checkpoints and savepoints complete successfully, it is only the 
stop-with-savepoint request where I see this behavior.


Respectfully,

[https://lh4.googleusercontent.com/cgQ5UKZ_oWF2ip_0c1HOs45h5UE-FQ6Gp561o43FbhJK7zovHLoYRx_PkeotKziAds52CL47siHAhV3N2eIqqsSAwfiZ_5O7fikdoFV1fj4h0UZnh--abrRte86VARCmquCG1w9KMnI]

Jose Vargas

Software Engineer, Data Engineering

E: jose.var...@fiscalnote.com

fiscalnote.com  |  
info.cq.com  | rollcall.com



请教flink cep如何对无序数据处理

2021-05-13 Thread sherlock zw
兄弟们,想问下Flink的CEP能够对无序数据流进行处理匹配嘛?
我这里指的无序事件是:例如有两个事件,事件A和事件B,在一个时间窗口内,只要匹配到了A和B,不论A和B的到来顺序,我都认为是符合我的条件


Re: RabbitMQ source does not stop unless message arrives in queue

2021-05-13 Thread Austin Cawley-Edwards
Hey Jose,

Thanks for bringing this up – it indeed sounds like a bug. There is ongoing
work to update the RMQ source to the new interface, which might address
some of these issues (or should, if it is not already), tracked in
FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would
you like me to?

At my previous company, we only consumed one Rabbit queue per application,
so we didn't run into this exactly but did see other weird behavior in the
RMQ source that could be related. I'm going to cc @John Morrow
 who might be able to contribute to what he's
seen working with the source, if he's around. I remember some messages not
properly being ack'ed during a stateful shutdown via the Ververica
Platform's stop-with-savepoint functionality that you mention, though that
might be more related to FLINK-20244[2], perhaps.


Best,
Austin

[1]: https://issues.apache.org/jira/browse/FLINK-20628
[2]: https://issues.apache.org/jira/browse/FLINK-20244

On Thu, May 13, 2021 at 10:23 AM Jose Vargas 
wrote:

> Hi,
>
> I am using Flink 1.12 to read from and write to a RabbitMQ cluster.
> Flink's RabbitMQ source has some surprising behavior when a
> stop-with-savepoint request is made.
>
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely
> unless a message arrives in all the queues that the job consumes from after
> the stop-with-savepoint request is made.
>
>
> I know that one possible workaround is to send a sentinel value to each of
> the queues consumed by the job that the deserialization schema checks in
> its isEndOfStream method. However, this is somewhat cumbersome and
> complicates the continuous delivery of a Flink job. For example,
> Ververica Platform will trigger a stop-with-savepoint for the user if one
> of many possible Flink configurations for a job are changed. The
> stop-with-savepoint can then hang indefinitely because only some of the
> RabbitMQ sources will have reached a FINISHED state.
>
> I have attached the TaskManager thread dump after the save-with-savepoint
> request was made. Most every thread is either sleeping or waiting around
> for locks to be released, and then there are a handful of threads trying to
> read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom
> method.
>
> Ideally, once a stop-with-savepoint request is made, the threads trying to
> read data from RabbitMQ would be interrupted so that all RabbitMQ sources
> would reach a FINISHED state.
>
> Regular checkpoints and savepoints complete successfully, it is only the
> stop-with-savepoint request where I see this behavior.
>
>
> Respectfully,
>
>
> Jose Vargas
>
> Software Engineer, Data Engineering
>
> E: jose.var...@fiscalnote.com
>
> fiscalnote.com   |  info.cq.com
>   | rollcall.com 
>
>


Incorrect flink documentation example

2021-05-13 Thread Tejas
Hi,
I was trying out examples given  here

  
to convert Datastream to Table and one of the example gives out error.
My code is as follows :

/StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream dataStream =
env.fromElements(
new User("Alice", 4, Instant.ofEpochMilli(1000)),
new User("Bob", 6, Instant.ofEpochMilli(1001)),
new User("Alice", 10, Instant.ofEpochMilli(1002)))
   
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60)));

StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.column("event_time", "TIMESTAMP(3)")
.column("name", "STRING")
.column("score", "INT")
.watermark("event_time",
"SOURCE_WATERMARK()")
.build());
table.printSchema();

table.select($("*")).execute().print();/

and the error I got is :
/Caused by: org.codehaus.commons.compiler.CompileException: Line 13, Column
103: Cannot cast "java.time.Instant" to "java.time.LocalDateTime"
at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)/

Can somebody help with this ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Long checkpoint duration for Kafka source operators

2021-05-13 Thread Hubert Chen
Hello,

I have an application that reads from two Kafka sources, joins them, and
produces to a Kafka sink. The application is experiencing long end to end
checkpoint durations for the Kafka source operators. I'm hoping I could get
some direction in how to debug this further.

Here is a UI screenshot of a checkpoint instance:

[image: checkpoint.png]

My goal is to bring the total checkpoint duration to sub-minute.

Here are some observations I made:

   - Each source operator task has an E2E checkpoint duration of 1m 7s
   - Each source operator task has sub 100ms sync, async, aligned buffered,
   and start delay
   - Each join operator task has a start delay of 1m 7s
   - There is no backpressure in any operator

These observations are leading me to believe that the source operator is
taking a long amount of time to checkpoint. I find this a bit strange as
the fushioned operator is fairly light. It deserializes the event, assigns
a watermark, and might perform two filters. In addition, it's odd that both
source operators have tasks with all the same E2E checkpoint duration.

Is there some sort of locking that's occurring on the source operators that
can explain these long E2E durations?

Best,
Hubert


RabbitMQ source does not stop unless message arrives in queue

2021-05-13 Thread Jose Vargas
Hi,

I am using Flink 1.12 to read from and write to a RabbitMQ cluster. Flink's
RabbitMQ source has some surprising behavior when a stop-with-savepoint
request is made.

*Expected Behavior:*
The stop-with-savepoint request stops the job with a FINISHED state.

*Actual Behavior:*
The stop-with-savepoint request either times out or hangs indefinitely
unless a message arrives in all the queues that the job consumes from after
the stop-with-savepoint request is made.


I know that one possible workaround is to send a sentinel value to each of
the queues consumed by the job that the deserialization schema checks in
its isEndOfStream method. However, this is somewhat cumbersome and
complicates the continuous delivery of a Flink job. For example,
Ververica Platform will trigger a stop-with-savepoint for the user if one
of many possible Flink configurations for a job are changed. The
stop-with-savepoint can then hang indefinitely because only some of the
RabbitMQ sources will have reached a FINISHED state.

I have attached the TaskManager thread dump after the save-with-savepoint
request was made. Most every thread is either sleeping or waiting around
for locks to be released, and then there are a handful of threads trying to
read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom
method.

Ideally, once a stop-with-savepoint request is made, the threads trying to
read data from RabbitMQ would be interrupted so that all RabbitMQ sources
would reach a FINISHED state.

Regular checkpoints and savepoints complete successfully, it is only the
stop-with-savepoint request where I see this behavior.


Respectfully,


Jose Vargas

Software Engineer, Data Engineering

E: jose.var...@fiscalnote.com

fiscalnote.com   |  info.cq.com
  | rollcall.com 


taskmanager_thread_dump.json
Description: application/json


Re: Apache Flink - A question about Tables API and SQL interfaces

2021-05-13 Thread Austin Cawley-Edwards
Hi Mans,

There are currently no public APIs for doing so, though if you're willing
to deal with some breaking changes there are some experimental config
options for late events in the Table API and SQL, seen in the
WIndowEmitStrategy class[1].

Best,
Austin

[1]:
https://github.com/apache/flink/blob/607919c6ea6983ae5ad3f82d63b7d6455c73d225/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala#L173-L211

On Wed, May 12, 2021 at 5:12 PM M Singh  wrote:

> Thanks Austin for your helpful references.
>
> I did take a look at [2]/[3] - but did not find anything relevant on
> searching for string 'late' (for allowed lateness etc) or side output.  So
> from my understanding the late events will be dropped if I am using Table
> API or SQL and the only option is to use datastream interface.  Please let
> me know if I missed anything.
>
> Thanks again.
>
>
> On Wednesday, May 12, 2021, 04:36:06 PM EDT, Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>
> Hi Mans,
>
> I don't believe there are explicit triggers/evictors/timers in the Table
> API/ SQL, as that is abstracted away from the lower-level DataStream API.
> If you need to get into the fine-grained details, Flink 1.13 has made some
> good improvements in going from the Table API to the DataStream API, and
> back again. [1]
>
> For working with time and lateness with Table API and SQL, some good
> places to look are the GroupBy Window Aggregation section of the Table API
> docs[2], as well as the SQL cookbook[3] and Ververica's SQL training
> wiki[4].
>
> Hope that helps,
> Austin
>
> [1]:
> https://flink.apache.org/news/2021/05/03/release-1.13.0.html#improved-interoperability-between-datastream-api-and-table-apisql
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#groupby-window-aggregation
> [3]:
> https://github.com/ververica/flink-sql-cookbook#aggregations-and-analytics
> [4]: https://github.com/ververica/sql-training/wiki/Queries-and-Time
>
> On Wed, May 12, 2021 at 1:30 PM M Singh  wrote:
>
> Hey Folks:
>
> I have the following questions regarding Table API/SQL in streaming mode:
>
> 1. Is there is a notion triggers/evictors/timers when using Table API or
> SQL interfaces ?
> 2. Is there anything like side outputs and ability to define allowed
> lateness when dealing with the Table API or SQL interfaces ?
>
> If there are any alternate ways for the above when using Table API or SQL,
> please let me know where I can find the relevant documentation/examples.
>
> Thanks for your help.
>
> Mans
>
>
>
>
>


FlinkCEP 尽可能多的匹配的问题

2021-05-13 Thread lp
我有一个flinkCEP程序,采用eventTime,监控形如如下的数据
[13/May/2021:20:45:36 +0800]
[13/May/2021:20:45:36 +0800]
[13/May/2021:20:45:37 +0800]
[13/May/2021:20:45:37 +0800]
[13/May/2021:20:45:50 +0800]



程序中关键设置如下:
设置了水印延迟2s
跳过测略AfterMatchSkipStrategy.skipPastLastEvent()

.times(3)
.within(Time.seconds(3));



结果得到如下结果:
detected 3 access in 60s from same ip...[/45:36, /45:36, /45:37]
迟到输出的数据...[/45:37],发生超时的时间戳是::2021-05-13 08:45:40


其实我想得到结果是:
在[13/May/2021:20:45:50 +0800]这条数据到来时,我想得到这样的结果:detected 3 access in 60s from
same ip...[/45:36, /45:36, /45:37, /45:37]
;因为他们都满足我.times(3).within(Time.seconds(3))的设置;

所以我应该怎样做?





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: How does JobManager terminate dangling task manager

2021-05-13 Thread Xintong Song
Hi narasimha,

For each TaskManager, there are two kinds of connections to the JobManager
process.
- One single connection to the ResourceManager, which allows RM to monitor
the slots' availability and assign them to Flink jobs.
- Connections to each JobMaster that the slots of this TM are assigned to.

Upon the JobMaster-TM disconnection, all tasks running on the TM that are
from the corresponding job are failed immediately. Take the Kafka source as
an example, that's where the task stops consuming data from Kafka.
Upon the RM-TM disconnection, TM kills itself if it cannot reconnect to the
RM within a certain time.
Since JobMaster and RM are in the same process, when one of the two
connections breaks, the other usually also breaks. In cases not, RM-TM
disconnection does not fail the running tasks, until the reconnection
timeout.

As for failover consistency, that is guaranteed by the checkpointing
mechanism. The new task does not resume from the exact position where the
old task is stopped. Instead, it resumes from the last successful
checkpoint.

Thank you~

Xintong Song



On Thu, May 13, 2021 at 5:38 PM Guowei Ma  wrote:

> Hi,
> In fact, not only JobManager(ResoruceManager) will kill TimeOut's
> TaskManager, but if TaskManager finds that it cannot connect to
> JobManager(ResourceManager), it will also exit by itself.
> You can look at the time period during which the HB timeout occurred and
> what happened in the log. Under normal circumstances, I also look at what
> the GC situation was like at that time.
> Best,
> Guowei
>
>
> On Thu, May 13, 2021 at 11:06 AM narasimha  wrote:
>
>> Hi,
>>
>> Trying to understand how JobManager. kills TaskManager that didn't
>> respond for heartbeat after a certain time.
>>
>> For example:
>>
>> If a network connection b/w JobManager and TaskManager is lost for some
>> reasons, the JobManager will bring up another Taskmanager post
>> hearbeat timeout.
>> In such a case, how does JobManager make sure all connections like to
>> Kafka from lost Taskmanager are cut down and the new one will take from a
>> certain consistent point.
>>
>> Also want to learn ways to debug what caused the timeout, our job fairly
>> handles 5k records/s, not a heavy traffic job.
>> --
>> A.Narasimha Swamy
>>
>


类型转换问题 String 类型如何转 decimal类型

2021-05-13 Thread WeiXubin
source 端接收到的数据类型为 String,  sink 端 MySQL 数据库字段类型定义为 decimal(12, 2)  , 在编写
insert 语句时该如何把 String 类型转换为 decimal, 尝试了 flink build-in 的 cast
并不行,请问各位有什么好的方法?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复:flink sql写hbase问题

2021-05-13 Thread 酷酷的浑蛋
不是,原因找到了,是函数多次嵌套导致,flink原始类型是not null,不能转换为string,这个报错信息真的是蛋疼,让人迷惑

















在 2021-05-13 10:09:49,"allanqinjy"  写道:
>光看异常,应该是你插入了空值吧,你插入hbase的时候做个filter过滤吧,比如你的rowkey空了,你往hbase插入应该是不行的。你可以试试。
>
>
>| |
>allanqinjy
>|
>|
>allanqi...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2021年05月12日 19:23,酷酷的浑蛋 写道:
>Mismatch of function's argument data type 'STRING NOT NULL' and actual 
>argument type 'STRING'.sql有些长,大概就是在执行  insert hbase sql时 报了上面的错误,请问这种错误是什么原因?


Re: How does JobManager terminate dangling task manager

2021-05-13 Thread Guowei Ma
Hi,
In fact, not only JobManager(ResoruceManager) will kill TimeOut's
TaskManager, but if TaskManager finds that it cannot connect to
JobManager(ResourceManager), it will also exit by itself.
You can look at the time period during which the HB timeout occurred and
what happened in the log. Under normal circumstances, I also look at what
the GC situation was like at that time.
Best,
Guowei


On Thu, May 13, 2021 at 11:06 AM narasimha  wrote:

> Hi,
>
> Trying to understand how JobManager. kills TaskManager that didn't respond
> for heartbeat after a certain time.
>
> For example:
>
> If a network connection b/w JobManager and TaskManager is lost for some
> reasons, the JobManager will bring up another Taskmanager post
> hearbeat timeout.
> In such a case, how does JobManager make sure all connections like to
> Kafka from lost Taskmanager are cut down and the new one will take from a
> certain consistent point.
>
> Also want to learn ways to debug what caused the timeout, our job fairly
> handles 5k records/s, not a heavy traffic job.
> --
> A.Narasimha Swamy
>


Re: Nested match_recognize query not supported in SQL ?

2021-05-13 Thread David Anderson
By the way, views that use MATCH_RECOGNIZE don't work in Flink 1.11. [1]

[1] https://issues.apache.org/jira/browse/FLINK-20077

On Thu, May 13, 2021 at 11:06 AM David Anderson 
wrote:

> I was able to get something like this working, but only by introducing a
> view:
>
> CREATE TEMPORARY VIEW mmm AS SELECT id FROM events MATCH_RECOGNIZE (...);
>
> SELECT * FROM event WHERE id IN (SELECT id FROM mmm);
>
> Regards,
> David
>
> On Tue, May 11, 2021 at 9:22 PM Tejas  wrote:
>
>> Hi,
>> I am using flink 1.11 and trying nested query where match_recognize is
>> inside, as shown below :
>> /select * from events where id = (SELECT * FROM events MATCH_RECOGNIZE
>> (PARTITION BY org_id ORDER BY proctime MEASURES A.id AS startId ONE ROW
>> PER
>> MATCH PATTERN (A C* B) DEFINE A AS A.tag = 'tag1', C AS C.tag <> 'tag2', B
>> AS B.tag = 'tag2'));/
>>
>> And I am getting an error as :
>> /org.apache.calcite.sql.validate.SqlValidatorException: Table 'A' not
>> found/
>>
>> Is this not supported ? If not what's the alternative ?
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


flink sql怎样将change log stream转换成append log stream?

2021-05-13 Thread casel.chen
flink sql怎样将change log stream转换成append log stream?
通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl + 
group by timestamp这种方式聚合。
问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!

Re: Flink 1.11版本LeaseRenewer线程不释放

2021-05-13 Thread zilong xiao
并没有定位到具体原因,只能靠重启作业缓解。。。

zhisheng  于2021年5月13日周四 下午4:20写道:

> 你好,这个问题后来定位到问题了吗?
>
> 我们生产也有一个作业有这样的问题,Flink 版本是 1.10.0,这个作业是 JM 的线程数很多(快 6k),作业是 flink 读取
> Kafka,会关联 HBase ,开启了 Checkpoint,就这个作业有问题,很奇怪
>
> https://tva1.sinaimg.cn/large/008i3skNgy1gqgvhdu674j31je0u0795.jpg
>
> zilong xiao  于2020年12月8日周二 下午6:21写道:
>
> > 作业数据流是 kafka -> flink ->
> > http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。
> 我再debug看看~
> >
> > Paul Lam  于2020年12月8日周二 下午6:00写道:
> >
> > > Hi,
> > >
> > > 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。
> > >
> > > Best,
> > > Paul Lam
> > >
> > > > 2020年12月8日 11:03,zilong xiao  写道:
> > > >
> > > > Hi Paul,
> > > >线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink
> > > >
> > >
> >
> 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root
> > > > cause。。
> > > >
> > > >另外 您说的“线程个数应该和用到的 HDFS 集群数目相同”不是很理解,作业只能提交到一个具体的集群吧?
> > > >
> > > > Paul Lam  于2020年12月8日周二 上午10:45写道:
> > > >
> > > >> 我记得 LeaseRenewer 是 JVM 级别的,线程个数应该和用到的 HDFS 集群数目相同。
> > > >>
> > > >> 你看看它们具体的线程名是不是完全相同(比如都是 user1@cluserA)?还有 HDFS client 的版本是什么?
> > > >>
> > > >> Best,
> > > >> Paul Lam
> > > >>
> > > >>> 2020年12月7日 18:11,zilong xiao  写道:
> > > >>>
> > > >>> 在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread
> > > Dump发现有很多名为LeaseRenewer
> > > >>> 的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢?
> > > >>>
> > > >>> Flink version: 1.11
> > > >>> State backend:filesystem
> > > >>> checkpoint interval: 60s
> > > >>
> > > >>
> > >
> > >
> >
>


Re: Nested match_recognize query not supported in SQL ?

2021-05-13 Thread David Anderson
I was able to get something like this working, but only by introducing a
view:

CREATE TEMPORARY VIEW mmm AS SELECT id FROM events MATCH_RECOGNIZE (...);

SELECT * FROM event WHERE id IN (SELECT id FROM mmm);

Regards,
David

On Tue, May 11, 2021 at 9:22 PM Tejas  wrote:

> Hi,
> I am using flink 1.11 and trying nested query where match_recognize is
> inside, as shown below :
> /select * from events where id = (SELECT * FROM events MATCH_RECOGNIZE
> (PARTITION BY org_id ORDER BY proctime MEASURES A.id AS startId ONE ROW PER
> MATCH PATTERN (A C* B) DEFINE A AS A.tag = 'tag1', C AS C.tag <> 'tag2', B
> AS B.tag = 'tag2'));/
>
> And I am getting an error as :
> /org.apache.calcite.sql.validate.SqlValidatorException: Table 'A' not
> found/
>
> Is this not supported ? If not what's the alternative ?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


?????? Flink 1.11????LeaseRenewer??????????

2021-05-13 Thread 5599
Flink java ??




----
??: "zhisheng"https://tva1.sinaimg.cn/large/008i3skNgy1gqgvhdu674j31je0u0795.jpg

zilong xiao 

Re: Flink 1.11版本LeaseRenewer线程不释放

2021-05-13 Thread zhisheng
你好,这个问题后来定位到问题了吗?

我们生产也有一个作业有这样的问题,Flink 版本是 1.10.0,这个作业是 JM 的线程数很多(快 6k),作业是 flink 读取
Kafka,会关联 HBase ,开启了 Checkpoint,就这个作业有问题,很奇怪

https://tva1.sinaimg.cn/large/008i3skNgy1gqgvhdu674j31je0u0795.jpg

zilong xiao  于2020年12月8日周二 下午6:21写道:

> 作业数据流是 kafka -> flink ->
> http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。 我再debug看看~
>
> Paul Lam  于2020年12月8日周二 下午6:00写道:
>
> > Hi,
> >
> > 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。
> >
> > Best,
> > Paul Lam
> >
> > > 2020年12月8日 11:03,zilong xiao  写道:
> > >
> > > Hi Paul,
> > >线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink
> > >
> >
> 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root
> > > cause。。
> > >
> > >另外 您说的“线程个数应该和用到的 HDFS 集群数目相同”不是很理解,作业只能提交到一个具体的集群吧?
> > >
> > > Paul Lam  于2020年12月8日周二 上午10:45写道:
> > >
> > >> 我记得 LeaseRenewer 是 JVM 级别的,线程个数应该和用到的 HDFS 集群数目相同。
> > >>
> > >> 你看看它们具体的线程名是不是完全相同(比如都是 user1@cluserA)?还有 HDFS client 的版本是什么?
> > >>
> > >> Best,
> > >> Paul Lam
> > >>
> > >>> 2020年12月7日 18:11,zilong xiao  写道:
> > >>>
> > >>> 在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread
> > Dump发现有很多名为LeaseRenewer
> > >>> 的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢?
> > >>>
> > >>> Flink version: 1.11
> > >>> State backend:filesystem
> > >>> checkpoint interval: 60s
> > >>
> > >>
> >
> >
>


Re: Regarding Stateful Functions

2021-05-13 Thread Jessy Ping
Hi Austin,


Thanks for your insights.


We are currently following a microservice architecture for accomplishing
our data processing requirements. We are planning to use Flink as our
unified platform for all data processing tasks. Although most of our use
cases are a suitable fit for Flink, there is one use case that needs some
extra deep dive into the capabilities of Flink.


As I mentioned in my previous email, the processing flow of the use case in
discussion is as follows,


*ingress(>=10k/s)--> First transformation based on certain static rules -->
second transformation based on certain dynamic rules --> Third and final
transformation based on certain dynamic and static rules --> egress*


In our current design, we are using a microservice embedded Hazelcast
cluster. It's a complex system with several stability issues. We are
looking for an alternative solution based on open sources, and it seems
like the stateful function powered by Flink is an ideal candidate. The
following features of 'Stateful Functions' attracted us,

1. Consistent State.

2. No Database Required

3. Exactly once semantics.

4. Logical Addressing

5. Multi-language support.


Any additional insights in the already mentioned questions are helpful.

Thanks

Jessy

On Thu, 13 May 2021 at 04:25, Austin Cawley-Edwards 
wrote:

> Hey Jessy,
>
> I'm not a Statefun expert but, hopefully, I can point you in the right
> direction for some of your questions. I'll also cc Gordan, who helps to
> maintain Statefun.
>
> *1. Is the stateful function a good candidate for a system(as above) that
>> should process incoming requests at the rate of 10K/s depending on various
>> dynamic rules and static rules*
>> *? *
>>
>
> The scale is definitely manageable in a Statefun cluster, and could
> possibly be a good fit for dynamic and static rules. Hopefully Gordon can
> comment more there. For the general Flink solution to this problem, I
> always turn to this great series of blog posts around fraud detection with
> dynamic rules[1].
>
> 2.* Is Flink capable of accommodating the above-mentioned dynamic rules
>> in its states (about 1500 rules per Keyed Event ) for the faster
>> transformation of incoming streams? *
>>
>
> This may be manageable as well, depending on how you are applying these
> rules and what they look like (size, etc.). Can you give any more
> information there?
>
>
> *3.** I**f we are not interested in using AWS lambda or Azure functions,
>> what are the other options?. What about using co-located functions and
>> embedded functions? * *Is there any benefit in using one over the other
>> for my data processing flow?*
>>
>
> Yes, you can embed JVM functions via Embedded Modules[2], which in your
> case might benefit from the Flink DataStream integration[3]. You can also
> host remote functions anywhere, i.e. Kubernetes, behind an NGINX server,
> etc. The Module Configuration section[4] will likely shed more light on
> what is available. I think the main tradeoffs here are availability,
> scalability, and network latency for external functions.
>
> 4*.If we are going with embedded functions/co-located functions, is it
>> possible to autoscale the application using the recently released reactive
>> mode in Flink 1.13?*
>>
>
> Statefun 3.0 uses Flink 1.12 but is expected to upgrade to Flink 1.13 in
> the next release cycle. There are a few other changes that are necessary to
> be compatible with Reactive Mode (i.e make the Statefun Cluster a regular
> Flink Application tracked in FLINK-16930 [5]), but it's coming!
>
>
> On a higher note, what made you interested in Statefun for this use case?
> The community is currently trying to expand our understanding of potential
> users, so it would be great to hear a bit more!
>
> Best,
> Austin
>
> [1]: https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
> [2]:
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/embedded/#embedded-module-configuration
> [3]:
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/
> [4]:
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/module/#module-configuration
> [5]: https://issues.apache.org/jira/browse/FLINK-16930
>
> On Wed, May 12, 2021 at 11:53 AM Jessy Ping 
> wrote:
>
>> Hi all,
>>
>>
>> I have gone through the stateful function's documentation and required
>> some expert advice or clarification regarding the following points.
>>
>>
>> *Note: My data processing flow is as follows,*
>>
>>
>> *ingress(10k/s)--> First transformation based on certain static rules -->
>> second transformation based on certain dynamic rules --> Third and final
>> transformation based on certain dynamic and static rules --> egress*
>>
>>
>> *Questions*
>>
>> *1. Is the stateful function a good candidate for a system(as above) that
>> should process incoming requests at the rate of 10K/s depending on various
>> dynamic rules and static rules**? *
>>

?????? flink on k8s native ????????

2021-05-13 Thread ????????????????
Hi Yang??


flink session on k8s ??examples  
application ??








flink run-application \
--target kubernetes-application \
-Dkubernetes.namespace=flink-session-cluster \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=flink:latest \
local:///opt/flink/examples/batch/WordCount.jar



 deployment  ??



service 





pod??





 describe pods ??





?? configmap 
flink-config-my-first-application-cluster?? ?? kubectl 
describe ??flink-config-my-first-application-cluster





?? describe pods ??


Name:
my-first-application-cluster-6c85b474-64t6x
Namespace:  flink-session-cluster
Priority:  0
Node:hkctttl104/10.199.252.104
Start Time: Thu, 13 May 2021 22:16:03 +0800
Labels:   app=my-first-application-cluster
   component=jobmanager
   pod-template-hash=6c85b474
   type=flink-native-kubernetes
Annotations: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#rbac;
 permissions
 to create, delete pods  
??

 kubectl create namespace flink-session-cluster

 kubectl create serviceaccount flink -n flink-session-cluster

 kubectl create clusterrolebinding flink-role-binding-flink \ --clusterrole
 =edit \ --serviceaccount=flink-session-cluster:flink


  session ?? flink 

 3.1 session 

 ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
 \
  -Dkubernetes.namespace=flink-session-cluster \
  -Dkubernetes.jobmanager.service-account=flink \
  -Dkubernetes.service.exposed.type=NodePort

 ??


 3.2 flink 

 ??
 ./bin/flink run \
  --target kubernetes-session \
  -Dkubernetes.namespace=flink-session-cluster \
  
-Dkubernetes.cluster-id=my-first-flink-cluster \
  -Dkubernetes.jobmanager.service-account=flink \
  ./examples/streaming/TopSpeedWindowing.jar

 ??

 Executing TopSpeedWindowing example with default input data set.
 Use --input to specify file input.
 Printing result to stdout. Use --output to specify output path.
 WARNING: An illegal reflective access operation has occurred
 WARNING: Illegal reflective access by
 org.apache.flink.api.java.ClosureCleaner
 (file:/home/dsi/soft/flink/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar) to
 field java.lang.String.value
 WARNING: Please consider reporting this to the maintainers of
 org.apache.flink.api.java.ClosureCleaner
 WARNING: Use --illegal-access=warn to enable warnings of further illegal
 reflective access operations
 WARNING: All illegal access operations will be denied in a future release
 2021-05-12 15:51:30,453 INFO
 
org.apache.flink.kubernetes.KubernetesClusterDescriptor
 [] - Retrieve
 flink cluster my-first-flink-cluster successfully, JobManager Web
 Interface: http://10.199.252.101:8081

 
 The program finished with the following exception:

 org.apache.flink.client.program.ProgramInvocationException: The main
 method caused an error: Failed to execute job 
'CarTopSpeedWindowingExample'.
 at
 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
 at
 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
 at
 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
 at
 org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
 at
 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
 at
 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
 Caused by: org.apache.flink.util.FlinkException: Failed to execute job
 'CarTopSpeedWindowingExample'.
 at
 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
 at
 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
 at
 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
 at
 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
 at
 
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.main(TopSpeedWindowing.java:99)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)
 at
 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 

Re: Flink sql task failure recovery does not work.

2021-05-13 Thread Guowei Ma
Hi
I think you could configure some restart strategy[1] likes
restart-strategy: fixed-delay

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#fixed-delay-restart-strategy
Best,
Guowei


On Thu, May 13, 2021 at 12:02 PM 1095193...@qq.com <1095193...@qq.com>
wrote:

> Hi team,
>Following Task Failure Recovery document
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/
>  ,  I have  enabled state.checkpoints.dir parameter in flink-conf.yaml.
>
> state.checkpoints.dir: hdfs://172.16.1.192:9000/flink-checkpoints
> state.savepoints.dir: hdfs://172.16.1.192:9000/flink-savepoints
>
> However, Flink sql Task still throw Exception 
> org.apache.flink.runtime.JobException:
> Recovery is suppressed by NoRestartBackoffTimeStrategy.  Apparently, no
> restart strategy enabled. Do we have enabled other configuration rather
> than state.checkpoints.dir.  Thanks for any suggestions.
>
> --
> 1095193...@qq.com
>


Re: Need Clarity about Checkpoint for Flink-1.12.2

2021-05-13 Thread Guowei Ma
Hi Sudhansu,
I think you do not need to set the config in flink-conf.
Best,
Guowei


On Thu, May 13, 2021 at 1:06 PM sudhansu jena 
wrote:

> Hi Team,
>
> We have recently enabled Check Pointing in our flink job using
> FSStateBackend pointing to S3 bucket.
>
> Below is the sample code for enabling check pointing though app code and
> we are using flink version - 1.12.2 .
>
> env.setStateBackend(new
> FsStateBackend("s3://flinkcheckpointing/job-name/",true));
> env.enableCheckpointing(1000);
> Class unmodColl =
> Class.forName("java.util.Collections$UnmodifiableCollection");
> env.getConfig().addDefaultKryoSerializer(unmodColl,
> UnmodifiableCollectionsSerializer.class);
> CheckpointConfig config = env.getCheckpointConfig();
>
> config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
>
> The query is , do we still need to set the below config in flink-conf.yaml
> for checkpointing to work.
>
> *state.checkpoints.dir: s3://prod-flink-checkpointing/checkpoint-metadata/*
>
>
> Thanks,
> Sudhansu
>