Re: Role of Rowtime Field Task?

2021-02-20 Thread Yuval Itzchakov
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#event-time

On Sun, Feb 21, 2021 at 8:43 AM Aeden Jameson 
wrote:

> In my job graph viewed through the Flink UI I see a task named,
>
> rowtime field: (#11: event_time TIME ATTRIBUTE(ROWTIME))
>
> that has an upstream Kafka source task. What exactly does the rowtime task
> do?
>
> --
> Thank you,
> Aeden
>


-- 
Best Regards,
Yuval Itzchakov.


Configure operator based on key

2021-02-20 Thread Abhinav Sharma
Hi,

Is there some way that I can configure an operator based on the key in a
stream?
Eg: If the key is 'abcd', then create a window of size X counts, if the key
is 'bfgh', then create a window of size Y counts.

Is this scenario possible in flink


Role of Rowtime Field Task?

2021-02-20 Thread Aeden Jameson
In my job graph viewed through the Flink UI I see a task named,

rowtime field: (#11: event_time TIME ATTRIBUTE(ROWTIME))

that has an upstream Kafka source task. What exactly does the rowtime task do?

-- 
Thank you,
Aeden


Re: Container is running beyond physical memory limits

2021-02-20 Thread Eleanore Jin
Hi
这是我之前看到一篇关于OOM KILL 的分析文章,不知道对你有没有用

http://www.whitewood.me/2021/01/02/%E8%AF%A6%E8%A7%A3-Flink-%E5%AE%B9%E5%99%A8%E5%8C%96%E7%8E%AF%E5%A2%83%E4%B8%8B%E7%9A%84-OOM-Killed/

On Thu, Feb 18, 2021 at 9:01 AM lian  wrote:

> 各位大佬好:
> 1. 背景:使用Flink
> SQL实现回撤流的功能,使用了Last_Value,第二层聚合进行sum求和计算,主要是依靠回撤流的方式,来实现对已经下发的数据进行减法的操作。
>   实现的功能和菜鸟如下基本是一致的。
>   https://developer.aliyun.com/article/457392
> 2. 版本及调优:flink
> 1.10.1,资源单个slot从2-6g,借助了rocksdb的状态后端来存储状态值,对flink的managed memory进行了反复的调优,
>调整过overhead内存,最小值设为1g,最大值设为2g
>blockcache大小,由默认值8mb设置为128mb
>block size大小,由默认值4kb设置为32kb
>flush线程数,由默认值1设置为4
>writebuffer,由默认值0.5调整为0.25
>采用了batch size,两阶段聚合参数
>增量checkpoint
>预分配内存设置为false
> 3. 状态大小: ck在100mb - 25g浮动,savepoint达到了120g
> 4. 存在的问题:
>  1. 内存出现不足,container被kill掉
>  2. ck时,状态大的很大,小的很小,不知道这个是什么原因?
> 5. 看了很多关于内存oom被kill的文章,调整了overhead参数,增大tm的内存来扩大managed
> memory,调整rocksdb的参数等一系列的参数,目前都还是运行一段时间后出现container 被kill。
> 6. 个人预留的问题:flink 的rocksdb的mertic的参数有添加,但是个人不是很清楚,如何进行打印和监控,如果后续的flink
> 的ui界面能够加上对这一块的页面展示,那就会比较好了。
> 所以,对于到底是哪块内存超了,目前也还不是很清楚。
>
>
>看看,有没有大佬能帮忙看一下,这个问题如何优化会比较好?
>看了2020
> flink峰会,唐云大佬对于rocksdb做的相关工作的分享,提到先提高overhead内存可以先解决这个问题,但是我目前还没有解决掉。以及提到用户的使用不当,也会导致这个问题,不知我的场景,是否是不合理的。
>
>
>   盼复~~~
>
>
>
>
>


Best way to handle BIGING to TIMESTAMP conversions

2021-02-20 Thread Sebastián Magrí
I have a table with two BIGINT fields for start and end of an event as UNIX
time in milliseconds. I want to be able to have a resulting column with the
delta in milliseconds and group by that difference. Also, I want to be able
to have aggregations with window functions based upon the `end` field.

The table definition looks like this:
|CREATE TABLE sessions (
|  `ats`   STRING,
|  `e` BIGINT,
|  `s` BIGINT,
|  `proc_time` AS PROCTIME(),
|  PRIMARY KEY (`ats`, `s`, `e`) NOT ENFORCED
|)

Then I have a few views like this:

CREATE VIEW second_sessions AS
  SELECT * FROM sessions
  WHERE `e` - `s` = 1000

And some windows using these views like this:

  WINDOW w3m AS (
PARTITION BY `t`
ORDER BY `proc_time`
RANGE BETWEEN INTERVAL '3' MINUTE PRECEDING AND CURRENT ROW
  )

I'd like to use the `e` field for windowing instead of `proc_time`. But I
keep running into errors with the `TO_TIMESTAMP(BIGINT)` function now
missing or with unsupported timestamp arithmetics.

What is the best practice for a case such as this?

Best Regards,
-- 
Sebastián Ramírez Magrí


Re: stop job with Savepoint

2021-02-20 Thread Alexey Trenikhun
Adding "list" to verbs helps, do I need to add anything else ?


From: Alexey Trenikhun 
Sent: Saturday, February 20, 2021 2:10 PM
To: Flink User Mail List 
Subject: stop job with Savepoint

Hello,
I'm running per job Flink cluster, JM is deployed as Kubernetes Job with 
restartPolicy: Never, highavailability is KubernetesHaServicesFactory. Job runs 
fine for some time, configmaps are created etc.  Now in order to upgrade Flink 
job, I'm trying to stop job with savepoint (flink stop $JOB_ID), JM exits with 
code 2, from log:

{"ts":"2021-02-20T21:34:18.195Z","message":"Terminating cluster entrypoint 
process StandaloneApplicationClusterEntryPoint with exit code 
2.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2,"stack_trace":"java.util.concurrent.ExecutionException:
 io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET 
at: 
https://10.96.0.1/api/v1/namespaces/n/configmaps?labelSelector=app%3Dfsp%2Cconfigmap-type%3Dhigh-availability%2Ctype%3Dflink-native-kubernetes.
 Message: Forbidden!Configured service account doesn't have access. Service 
account may have been revoked. configmaps is forbidden: User 
\"system:serviceaccount:n:fsp\" cannot list resource \"configmaps\" in API 
group \"\" in the namespace \"n\".\n\tat 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)\n\tat
 java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat 
org.apache.flink.kubernetes.highavailability.KubernetesHaServices.internalCleanup(KubernetesHaServices.java:142)\n\tat
 
org.apache.flink.runtime.highavailability.AbstractHaServices.closeAndCleanupAllData(AbstractHaServices.java:180)\n\tat
 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.stopClusterServices(ClusterEntrypoint.java:378)\n\tat
 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$shutDownAsync$3(ClusterEntrypoint.java:467)\n\tat
 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$composeAfterwards$19(FutureUtils.java:704)\n\tat
 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat
 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$null$18(FutureUtils.java:715)\n\tat
 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat
 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent.lambda$closeAsyncInternal$3(DispatcherResourceManagerComponent.java:182)\n\tat
 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat
 
org.apache.flink.runtime.concurrent.FutureUtils$CompletionConjunctFuture.completeFuture(FutureUtils.java:956)\n\tat
 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat
 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$22(FutureUtils.java:1323)\n\tat
 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
 java.lang.Thread.run(Thread.java:748)\nCaused by: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET 
at: 
https://10.96.0.1/api/v1/namespaces/n/configmaps?labelSelector=app%3Dfsp%2Cconfigmap-type%3Dhigh-availability%2Ctype%3Dflink-native-kubernetes.
 Message: Forbidden!Configured service account doesn't have access. Service 
account may have been revoked. configmaps is forbidden: User 
\"system:serviceaccount:n:fsp\" cannot list resource 

stop job with Savepoint

2021-02-20 Thread Alexey Trenikhun
Hello,
I'm running per job Flink cluster, JM is deployed as Kubernetes Job with 
restartPolicy: Never, highavailability is KubernetesHaServicesFactory. Job runs 
fine for some time, configmaps are created etc.  Now in order to upgrade Flink 
job, I'm trying to stop job with savepoint (flink stop $JOB_ID), JM exits with 
code 2, from log:

{"ts":"2021-02-20T21:34:18.195Z","message":"Terminating cluster entrypoint 
process StandaloneApplicationClusterEntryPoint with exit code 
2.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2,"stack_trace":"java.util.concurrent.ExecutionException:
 io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET 
at: 
https://10.96.0.1/api/v1/namespaces/n/configmaps?labelSelector=app%3Dfsp%2Cconfigmap-type%3Dhigh-availability%2Ctype%3Dflink-native-kubernetes.
 Message: Forbidden!Configured service account doesn't have access. Service 
account may have been revoked. configmaps is forbidden: User 
\"system:serviceaccount:n:fsp\" cannot list resource \"configmaps\" in API 
group \"\" in the namespace \"n\".\n\tat 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)\n\tat
 java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat 
org.apache.flink.kubernetes.highavailability.KubernetesHaServices.internalCleanup(KubernetesHaServices.java:142)\n\tat
 
org.apache.flink.runtime.highavailability.AbstractHaServices.closeAndCleanupAllData(AbstractHaServices.java:180)\n\tat
 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.stopClusterServices(ClusterEntrypoint.java:378)\n\tat
 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$shutDownAsync$3(ClusterEntrypoint.java:467)\n\tat
 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$composeAfterwards$19(FutureUtils.java:704)\n\tat
 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat
 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$null$18(FutureUtils.java:715)\n\tat
 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat
 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent.lambda$closeAsyncInternal$3(DispatcherResourceManagerComponent.java:182)\n\tat
 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat
 
org.apache.flink.runtime.concurrent.FutureUtils$CompletionConjunctFuture.completeFuture(FutureUtils.java:956)\n\tat
 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat
 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$22(FutureUtils.java:1323)\n\tat
 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
 java.lang.Thread.run(Thread.java:748)\nCaused by: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET 
at: 
https://10.96.0.1/api/v1/namespaces/n/configmaps?labelSelector=app%3Dfsp%2Cconfigmap-type%3Dhigh-availability%2Ctype%3Dflink-native-kubernetes.
 Message: Forbidden!Configured service account doesn't have access. Service 
account may have been revoked. configmaps is forbidden: User 
\"system:serviceaccount:n:fsp\" cannot list resource \"configmaps\" in API 
group \"\" in the namespace \"n\".\n\tat 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:568)\n\tat
 

Flink job finished unexpected

2021-02-20 Thread Rainie Li
Hello,

I launched a job with a larger load on hadoop yarn cluster.
The Job finished after running 5 hours, I didn't find any error from
JobManger log besides this connect exception.





*2021-02-20 13:20:14,110 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [/10.1.57.146:48368
] failed with java.io.IOException: Connection
reset by peer2021-02-20 13:20:14,110 WARN
 akka.remote.ReliableDeliverySupervisor-
Association with remote system [akka.tcp://flink-metrics@host:35241] has
failed, address is now gated for [50] ms. Reason: [Disassociated]
2021-02-20 13:20:14,110 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink@host:39493] has failed, address is now gated for [50] ms.
Reason: [Disassociated] 2021-02-20 13:20:14,110 WARN
 akka.remote.ReliableDeliverySupervisor-
Association with remote system [akka.tcp://flink-metrics@host:38481] has
failed, address is now gated for [50] ms. Reason: [Disassociated] *

Any idea what caused the job to be finished and how to resolve it?
Any suggestions are appreciated.

Thanks
Best regards
Rainie


Re: [Flink SQL] FLOOR(timestamp TO WEEK) not working

2021-02-20 Thread Sebastián Magrí
I'm using the latest Flink 1.12 and the timestamps precision is coming from
Debezium, which I think is a standard ISO-8601 timestamp.

On Thu, 18 Feb 2021 at 16:19, Timo Walther  wrote:

> Hi Sebastián,
>
> which Flink version are you using? And which precision do the timestamps
> have?
>
> This looks clearly like a bug to me. We should open an issue in JIRA.
>
> Regards,
> Timo
>
> On 18.02.21 16:17, Sebastián Magrí wrote:
> > While using said function in a query I'm getting a query compilation
> > error saying that there's no applicable method for the given arguments.
> > The parameter types displayed in the error are
> >
> > org.apache.flink.table.data.TimestampData,
> > org.apache.flink.table.data.TimestampData
> >
> > And there's no overload listed for
> > `org.apache.calcite.runtime.SqlFunctions.floor` that matches those
> > parameters.
> >
> > Is this supported at all?
> > Is there anything in particular which should be imported or added to the
> > build to make this work?
> >
> > --
> > Sebastián Ramírez Magrí
>
>

-- 
Sebastián Ramírez Magrí


Re: Compile time checking of SQL

2021-02-20 Thread Sebastián Magrí
I mean the SQL queries being validated when I do `mvn compile` or any
target that runs that so that basic syntax checking is performed without
having to submit the job to the cluster.

On Thu, 18 Feb 2021 at 16:17, Timo Walther  wrote:

> Hi Sebastián,
>
> what do you consider as compile time? If you mean some kind of SQL
> editor, you could take a look at Ververica platform (the community
> edition is free):
>
>
> https://www.ververica.com/blog/data-pipelines-with-flink-sql-on-ververica-platform
>
> Otherwise Flink SQL is always validated at (what we call) "pre-flight
> phase". A cluster is not required but it is already JVM runtime of the
> client.
>
> Regards,
> Timo
>
> On 18.02.21 14:55, Sebastián Magrí wrote:
> > Is there any way to check SQL strings in compile time?
> >
> > --
> > Sebastián Ramírez Magrí
>
>

-- 
Sebastián Ramírez Magrí


Re: Cep application with Flink

2021-02-20 Thread Maminspapin
Hello, *Jörn Franke*. 

Thank you for reply.

If I correctly realise your answer, the watermark Flink mechanism should
help me sort events in order I need. So I should dig deeper in that issue. 

For example, I read three topics, make joins and after get two events by the
same user in this order:

event B (time_event: 12:00) -> event A (time_event: 10:00)

And after watermarkering, the situation is another:

A -> B

Can you confirm I move in right way?

And another moment about communicating with clients. Why need I use nosql
DB? I want to use Flink CEP library to build pattern:

- recived event A, 
- then recived Event B, 
- not recived event C after B for 20 minutes

If pattern is completed by user, so I push record to special Kafka topic.
And another system must make a notification to client. I want to get this
picture.

Or in fact it's no way to get this result using Flink and CEP lib?

Sorry for disturning you. I have no experience with Flink at all. And now I
want to realise how it compatible with that task.

Thanks,
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink on k8s日志时间戳时区问题

2021-02-20 Thread Yang Wang
也可以在build镜像的时候来进行设置


Best,
Yang

Michael Ran  于2021年2月19日周五 下午7:35写道:

> k8s  设置的
> 在 2021-02-19 09:37:28,"casel.chen"  写道:
> >目前是UTC时区的,怎样才能设置成当地的东8区呢?谢谢!
> >
> >
> >2021-02-19 01:34:21,259 INFO  akka.event.slf4j.Slf4jLogger
>  [] - Slf4jLogger started
> >2021-02-19 01:34:22,155 INFO  akka.remote.Remoting
>  [] - Starting remoting
> >2021-02-19 01:34:21,259 INFO akka.event.slf4j.Slf4jLogger [] -
> Slf4jLogger started
> >2021-02-19 01:34:22,155 INFO akka.remote.Remoting [] - Starting remoting
>


Re: Flink standalone模式如何区分各个任务的日志?

2021-02-20 Thread Yang Wang
我理解你说的应该是standalone session,这种模式下一个TM上面是会跑不同job的task的

TM里面的框架日志都是混在一起的,如果你的job class是在不同的package下面
可以用log4j2针对不同的package设置不同的logger以及appender来输出到不同路径


Best,
Yang

xingoo <23603...@qq.com> 于2021年2月20日周六 下午5:31写道:

> Dear All:
> 目前Flink部署主要采用standalone,想了解下如何在同一个taskmanager区分各个job的日志。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Run the code in the UI

2021-02-20 Thread Abu Bakar Siddiqur Rahman Rocky
Hi,

What is the way to run the code (from eclipse or intellij idea) to the
Apache Flink UI?

Thank you!


Re:Flink SQL并发度问题

2021-02-20 Thread Ye Chen
并发度的设置有优先级,客户端级别小于算子级别,所以上游source算子单独设置并发度会生效,而下游仍然是客户端级别的并发度。

















在 2021-02-20 18:23:52,"guaishushu1...@163.com"  写道:
> 这几天研究了flink table 转化为stream node 的源码,发现是某个算子的并发度取决于上一个算子的并发度。
>但是在实际测试过程中发现使用window aggregate 语句时候 该算子的并发度和上游的source不一致 和我cli 命令配置的并发度一致 
>这是为什么呢?
>
>
>guaishushu1...@163.com


Re: Cep application with Flink

2021-02-20 Thread Jörn Franke
You are working in a distributed system so event ordering by time may not be 
sufficient (or most likely not). Due to network delays, devices offline etc it 
can happen that an event arrives much later although it happened before. Check 
watermarks in flink and read on at least once, mostly once and exactly once 
delivery guarantees.

How do you plan to connect the mobile app? If you for example have a 
notification for the mobile app. Here you have different choices - event Bus 
architectures (see stomp) or a (nosql) database (see also lambda architecture).

> Am 20.02.2021 um 08:24 schrieb Люльченко Юрий Николаевич :
> 
> 
> Hi there.
>  
> I’m a newby with Flink and I need to develop a CEP application using this 
> technology. At least, we are choosing a tool now. So I have a question how 
> Flink really fit to my goals.
>  
> Buisness requirements:
>  
> There is a mobile application. Data is collected in Kafka topics (which are 
> have multi partitions) from it. Number of topics is three. And it is not a 
> ready events by user. At first I need to join all data from topics and only 
> after this we have a good clean event of user.
>  
> The order of events by each user is matter.
>  
> So the rules can be like these: User does action A, then B, and then for some 
> period does not action event C. If such a sequence is recived, so the system 
> communicate with this user by different chanels. 
>  
> We don't want to use an extarnal DB, but only Flink states.
>  
> My questions are:
>  
> 1. Using Kafka as input stream of data and collect an event by user.
>  
> I think that the order of clean user events will be wrong with this way, 
> because topics are not partitioned by user key and only one topic has this 
> field. So can I reorder these events by time field of event?
>  
> 2. State of events.
>  
> Can I query the state using SQL syntax? I don't want iterate all records of 
> store to make a communication.
> In case described above (A -> B -> x period waiting -> no C -> 
> communication), the B event stored in state. If C recived the system cleans B 
> in store. We need query the store and get all records B with B.event_time + 
> period_waiting < now_time.
> Or can the CEP library make this job by pattern?
>  
> 3. May be the solve of these requrements are not correct anougth. But again 
> does Flink can help realise this task?
>  
> Thanks,
> Yuri L.


Flink SQL并发度问题

2021-02-20 Thread guaishushu1...@163.com
 这几天研究了flink table 转化为stream node 的源码,发现是某个算子的并发度取决于上一个算子的并发度。
但是在实际测试过程中发现使用window aggregate 语句时候 该算子的并发度和上游的source不一致 和我cli 命令配置的并发度一致 
这是为什么呢?


guaishushu1...@163.com


Flink standalone模式如何区分各个任务的日志?

2021-02-20 Thread xingoo
Dear All:
目前Flink部署主要采用standalone,想了解下如何在同一个taskmanager区分各个job的日志。



--
Sent from: http://apache-flink.147419.n8.nabble.com/