AW: Statefun: cancel "sendAfter"

2021-02-02 Thread Stephan Pelikan
Hi,

thank you Gordon for clarification. My use-case is processing business events 
of customers. Those events are triggered by ourself or by the customer 
depending of what’s the current state of the ongoing customer’s business 
use-case. We need to monitor delayed/missing business events which belong to 
previous events. For example: the customer has to confirm something we did. 
Depending on what it is the confirmation has to be within hours, days or even 
months. If there is a delay we need to know. But if the customer confirms in 
time we want to cleanup to keep the state small.

I dug a little bit into the code. May I create an issue to discuss my ideas?

Cheers,
Stephan


Von: Tzu-Li (Gordon) Tai 
Gesendet: Mittwoch, 3. Februar 2021 07:58
An: Stephan Pelikan 
Cc: user@flink.apache.org; Igal Shilman 
Betreff: Re: Statefun: cancel "sendAfter"

Hi,

You are right, currently StateFun does not support deleting a scheduled delayed 
message.

StateFun supports delayed messages by building on top of two Flink constructs: 
1) registering processing time timers, and 2) buffering the message payload to 
be sent in state.

The delayed messages are kept in the Flink state of the sending operator, and 
timers are registered on the sending operator as well. So technically, there 
doesn't seem to be a blocker for deleting a delayed message and its associated 
timer, if it hasn't been sent yet.

Can you maybe open a JIRA ticket for this, so we have something that tracks it?
Also cc'ing Igal, who might have more comments on whether supporting this makes 
sense.

Cheers,
Gordon

On Wed, Feb 3, 2021 at 3:51 AM Stephan Pelikan 
mailto:stephan.peli...@phactum.at>> wrote:
Hi,

I think about using „sendAfter“ to implement some kind of timer functionality. 
I’m wondering if there is no possibility to cancel delayed sent message!

In my use case it is possible that intermediate events make the delayed message 
obsolete. In some cases the statefun of that certain ID is cleared (clear all 
state variables) and does not exist anymore. In other cases the statefun of 
that ID still exists (and its state). In the latter case I could ignore the 
delayed message, but what about those statefun which do not exist anymore?

Additionally there can be millions of delayed messages which I do not need any 
more and some delays are also hours, days or even months. I don’t want to 
pollute my state with this because it will inflate the size of my checkpoints.

There are no hints in the docs 
(https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/java.html#sending-delayed-messages)
 how those situations are treated. I found in the Flink’s docs that timers of 
keyed processors can be deleted. As far as I know statefuns are based on those 
processors, so I hope that there is something about it. I hope someone can 
clarify what I can expect and how those situations are handled internally.

Thanks,
Stephan


Re: Flink 提交作业时的缓存可以删除吗

2021-02-02 Thread Robin Zhang
Hi,tison
 感谢提供思路。当前版本flink1.10,测试发现在yarn web ui点击左上角kill,无法触发删除。通过flink web
ui中的cancel按钮以及 官方建议的停止job 的方式(echo "stop" | ./bin/yarn-session.sh -id
application_Id)是可以实现停止任务即可清除文件。 
之前没有清除的文件是因为在yarn web ui直接点击kill。

调用栈: 
org.apache.flink.yarn.Utils.deleteApplicationFiles:214
org.apache.flink.yarn.YarnClusterDescriptor.killCluster:403
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run:569

Best,
Robin.


tison wrote
> org/apache/flink/yarn/YarnResourceManagerDriver.java:236
> org/apache/flink/yarn/YarnClusterDescriptor.java:495
> 
> 应该是会在作业退出或者强杀的时候清理的,你可以看一下对应版本有无这个逻辑
> 
> 可以加一下日志看看实际是否触发,删除的是什么目录
> 
> Best,
> tison.
> 
> 
> Robin Zhang 

> vincent2015qdlg@

>  于2021年2月2日周二 下午2:37写道:
> 
>> Flink 1.12下会将flink的依赖以及作业的jar包缓存在hdfs上,如下图:
>>
>> <
>> http://apache-flink.147419.n8.nabble.com/file/t447/flink_%E6%8F%90%E4%BA%A4%E6%97%B6%E7%BC%93%E5%AD%98.png>
>>
>>
>>
>> 由于flink很早就开始使用了,这种目录越来越多,就算任务不在运行也不会自动清除。经过简单测试,直接删除后,不影响任务的运行以及简单的状态恢复。目前不知道会不会存在其他依赖,希望有清楚的能解释下这个的原理、作用以及能否删除。
>> 删除的目的是为了节省hdfs空间,做自身优化;另一方面是想弄清楚这个的原理
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 Thread lp
谢谢!
我摘录的是flink1.11.2版本文档最后那部分:Background / Internals,介绍flink 如何在yarn上运行的
的内容:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html
 
。感觉版本比较新了,应该没有问题吧,也是我们生产上在用的版本。1.12版本中没有找到相关内容。
仔细看了下文档,可能是我对flink on yarn的理解不太清楚,还是有几个问题请教下:
①flink on yarn模式下,jobmanager 和
appLicationMaster是两个独立的线程的概念,运行在一个container这个JVM里面的,对么?
②flink on yarn per-job mod提交作业后,节点上执行jps,
有YarnJobClusterEntrypoint和YarnTaskExecutorRunner这两个进程,这两个进程是什么?
③1.12版本有最新的关于flink on yarn 运行原理的介绍吗,我在官网没有看到这部分内容





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink cli to upload flink jar and run flink jobs in the Jenkins pipeline

2021-02-02 Thread Yang Wang
I think the Flink client could make a connection with ZooKeeper via the
network load balancer.
Flink client is not aware of whether it is a network balancer or multiple
ZooKeeper server address.
After then Flink client will retrieve the active leader JobManager address
via ZooKeeperHAService
and submit the job successfully via rest client.

Best,
Yang


sidhant gupta  于2021年2月2日周二 下午11:14写道:

> Hi
>
> I have a flink ECS cluster setup with HA mode using zookeeper where I have
> 2 jobmanagers out of which one of will be elected as leader using zookeeper
> leader election. I have one application load balancer in front of the
> jobmanagers and one network load balancer in front of zookeeper.
>
> As per [1]
> 
>  ,
> we can provide zookeeper address in the flink cli arguments and it would
> upload/ submit the jar to the leader jobmanager. But since I am using
> network load balancer in front of zookeeper, I guess it is not able to make
> connection with the zookeeper. Please provide suggestions or sample command
> for uploading the flink job jar or run the job.
>
> Is  there any way by which we can distinguish between leader and standby
> jobmanagers in terms of request or response ?
>
> Can we use flink cli in jenkins to upload the jar to the flink cluster and
> run the jobs?
>
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html
>
> Thanks
> Sidhant Gupta
>


Re: Statefun: cancel "sendAfter"

2021-02-02 Thread Tzu-Li (Gordon) Tai
Hi,

You are right, currently StateFun does not support deleting a scheduled
delayed message.

StateFun supports delayed messages by building on top of two Flink
constructs: 1) registering processing time timers, and 2) buffering the
message payload to be sent in state.

The delayed messages are kept in the Flink state of the sending operator,
and timers are registered on the sending operator as well. So technically,
there doesn't seem to be a blocker for deleting a delayed message and its
associated timer, if it hasn't been sent yet.

Can you maybe open a JIRA ticket for this, so we have something that tracks
it?
Also cc'ing Igal, who might have more comments on whether supporting this
makes sense.

Cheers,
Gordon


On Wed, Feb 3, 2021 at 3:51 AM Stephan Pelikan 
wrote:

> Hi,
>
>
>
> I think about using „sendAfter“ to implement some kind of timer
> functionality. I’m wondering if there is no possibility to cancel delayed
> sent message!
>
>
>
> In my use case it is possible that intermediate events make the delayed
> message obsolete. In some cases the statefun of that certain ID is cleared
> (clear all state variables) and does not exist anymore. In other cases the
> statefun of that ID still exists (and its state). In the latter case I
> could ignore the delayed message, but what about those statefun which do
> not exist anymore?
>
>
>
> Additionally there can be millions of delayed messages which I do not need
> any more and some delays are also hours, days or even months. I don’t want
> to pollute my state with this because it will inflate the size of my
> checkpoints.
>
>
>
> There are no hints in the docs (
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/java.html#sending-delayed-messages)
> how those situations are treated. I found in the Flink’s docs that timers
> of keyed processors can be deleted. As far as I know statefuns are based on
> those processors, so I hope that there is something about it. I hope
> someone can clarify what I can expect and how those situations are handled
> internally.
>
>
>
> Thanks,
>
> Stephan
>


Re: Question on Flink and Rest API

2021-02-02 Thread Ejaskhan S
Yes Gordon, it's obviously gave me a starting point to think about.

On Wed, Feb 3, 2021, 12:02 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> There is no out-of-box Flink source/sink connector for this, but it isn't
> unheard of that users have implemented something to support what you
> outlined.
>
> One way to possibly achieve this is: in terms of a Flink streaming job
> graph, what you would need to do is co-locate the source (which exposes the
> endpoint and maintains a pool of open client connections mapped by request
> ID), and the sink operators (which receives processed results with the
> original request IDs attached, and is in charge for replying to the
> original
> requests). The open client connections need to be process-wide accessible
> (e.g. via a static reference), so that when a co-located sink operator
> receives a result, it can directly fetch the corresponding client
> connection
> and return a response.
>
> The specifics are of course a bit more evolved; probably need some digging
> around previous Flink Forward conference talks to get a better picture.
> Hopefully this gives you a starting point to think about.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Question on Flink and Rest API

2021-02-02 Thread Ejaskhan S
Hi Raghavendar,

Yes , you are right. Your approach is correct ,and it is the most
straightforward one.but I was just thinking about the possibilities of my
question mentioned.

Thanks
EK

On Wed, Feb 3, 2021, 12:02 PM Raghavendar T S 
wrote:

> Hi Ejaskhan
>
> As per my understanding, this approach will require your data source to
> run a HTTP server within itself (embedded web server) and I am not sure If
> it is a good design. It looks like you are trying to build a
> synchronous(client-server model) processing model in Flink. But Flink is
> meant for processing data asynchronously (streaming/batch). Can you
> elaborate on the use case which you are trying to address? What kind of
> processing are you planning to do in Flink?
>
> Check If the following approach makes sense for your use case.
> Accept the events from HTTP client and persist the events to a data store
> (SQL/NoSQL) and publish the same to Kafka(topic)/RabbitMQ(queue). Let Flink
> data source listen to these topics/queues and update the status
> (SUCCESS/FAILURE) in the data store. If your clients are ok to see some lag
> in events, you can directly publish the events to
> Kafka(topic)/RabbitMQ(queue) without persisting it in the data store. Let
> Flink do all the processing and finally write to the data store.
>
> Thank you
> Raghavendar T S
> https://www.linkedin.com/in/raghavendar-ts
>
> On Wed, Feb 3, 2021 at 11:29 AM Ejaskhan S  wrote:
>
>> Team,
>>
>> It's just a random thought.
>>
>> Can I make the Flink application exposing a rest endpoint for the data
>> source? So a client could send data to this endpoint. Subsequently, Flink
>> processed this data and responded to the client application through the
>> endpoint, like a client-server model.
>>
>> Thanks
>> *EK*
>>
>>
>>
>
> --
> Raghavendar T S
> www.teknosrc.com
>


Re: Question on Flink and Rest API

2021-02-02 Thread Tzu-Li (Gordon) Tai
Hi,

There is no out-of-box Flink source/sink connector for this, but it isn't
unheard of that users have implemented something to support what you
outlined.

One way to possibly achieve this is: in terms of a Flink streaming job
graph, what you would need to do is co-locate the source (which exposes the
endpoint and maintains a pool of open client connections mapped by request
ID), and the sink operators (which receives processed results with the
original request IDs attached, and is in charge for replying to the original
requests). The open client connections need to be process-wide accessible
(e.g. via a static reference), so that when a co-located sink operator
receives a result, it can directly fetch the corresponding client connection
and return a response.

The specifics are of course a bit more evolved; probably need some digging
around previous Flink Forward conference talks to get a better picture.
Hopefully this gives you a starting point to think about.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question on Flink and Rest API

2021-02-02 Thread Raghavendar T S
Hi Ejaskhan

As per my understanding, this approach will require your data source to run
a HTTP server within itself (embedded web server) and I am not sure If it
is a good design. It looks like you are trying to build a
synchronous(client-server model) processing model in Flink. But Flink is
meant for processing data asynchronously (streaming/batch). Can you
elaborate on the use case which you are trying to address? What kind of
processing are you planning to do in Flink?

Check If the following approach makes sense for your use case.
Accept the events from HTTP client and persist the events to a data store
(SQL/NoSQL) and publish the same to Kafka(topic)/RabbitMQ(queue). Let Flink
data source listen to these topics/queues and update the status
(SUCCESS/FAILURE) in the data store. If your clients are ok to see some lag
in events, you can directly publish the events to
Kafka(topic)/RabbitMQ(queue) without persisting it in the data store. Let
Flink do all the processing and finally write to the data store.

Thank you
Raghavendar T S
https://www.linkedin.com/in/raghavendar-ts

On Wed, Feb 3, 2021 at 11:29 AM Ejaskhan S  wrote:

> Team,
>
> It's just a random thought.
>
> Can I make the Flink application exposing a rest endpoint for the data
> source? So a client could send data to this endpoint. Subsequently, Flink
> processed this data and responded to the client application through the
> endpoint, like a client-server model.
>
> Thanks
> *EK*
>
>
>

-- 
Raghavendar T S
www.teknosrc.com


?????? ??????????????POJO??????????????????????????????GenericType????

2021-02-02 Thread ?Y??????????????????
publicget/set()
 



----
??: "??"

Re: Question a possible use can for Iterative Streams.

2021-02-02 Thread Tzu-Li (Gordon) Tai
Hi Marco,

In the ideal setup, enrichment data existing in external databases is
bootstrapped into the streaming job via Flink's State Processor API, and any
follow-up changes to the enrichment data is streamed into the job as a
second union input on the enrichment operator.
For this solution to scale, lookups to the enrichment data needs to be by
the same key as the input data, i.e. the enrichment data is co-partitioned
with the input data stream.

I assume you've already thought about whether or not this would work for
your case, as it's a common setup for streaming enrichment.

Otherwise, I believe your brainstorming is heading in the right direction,
in the case that remote database lookups + local caching in state is a must.
I'm personally not familiar with the iterative streams in Flink, but in
general I think it is currently discouraged to use it.

On the other hand, I think using Stateful Function's [1] programing
abstraction might work here, as it allows arbitrary messaging between
functions and cyclic dataflows.
There's also an SDK that allows you to embed StateFun functions within a
Flink DataStream job [2].

Very briefly, the way you would model this database cache hit / remote
lookup is by implementing a function, e.g. called DatabaseCache.
The function would expect message types of Lookup(lookupKey), and replies
with a response of Result(lookupKey, value). The abstraction allows you, for
on incoming message, to register state (similar to vanilla Flink), as well
as register async operations with which you'll use to perform remote
database lookups in case of cache / state miss. It also provides means for
"timers" in the form of delayed messages being sent to itself, if you need
some mechanism for cache invalidation.

Hope this provides some direction for you to think about!

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/flink-datastream.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-02 Thread Lasse Nedergaard
Hi

We had something similar and our problem was class loader leaks. We used a 
summary log component to reduce logging but still turned out that it used a 
static object that wasn’t released when we got an OOM or restart. Flink was 
reusing task managers so only workaround was to stop the job wait until they 
was removed and start again until we fixed the underlying problem. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 3. feb. 2021 kl. 02.54 skrev Xintong Song :
> 
> 
>> How is the memory measured?
> I meant which flink or k8s metric is collected? I'm asking because depending 
> on which metric is used, the *container memory usage* can be defined 
> differently. E.g., whether mmap memory is included.
> 
> Also, could you share the effective memory configurations for the 
> taskmanagers? You should find something like the following at the beginning 
> of taskmanger logs.
> 
>> INFO  [] - Final TaskExecutor Memory configuration:
>> INFO  [] -   Total Process Memory:  1.688gb (1811939328 bytes)
>> INFO  [] - Total Flink Memory:  1.250gb (1342177280 bytes)
>> INFO  [] -   Total JVM Heap Memory: 512.000mb (536870902 bytes)
>> INFO  [] - Framework:   128.000mb (134217728 bytes)
>> INFO  [] - Task:384.000mb (402653174 bytes)
>> INFO  [] -   Total Off-heap Memory: 768.000mb (805306378 bytes)
>> INFO  [] - Managed: 512.000mb (536870920 bytes)
>> INFO  [] - Total JVM Direct Memory: 256.000mb (268435458 bytes)
>> INFO  [] -   Framework: 128.000mb (134217728 bytes)
>> INFO  [] -   Task:  0 bytes
>> INFO  [] -   Network:   128.000mb (134217730 bytes)
>> INFO  [] - JVM Metaspace:   256.000mb (268435456 bytes)
>> INFO  [] - JVM Overhead:192.000mb (201326592 bytes)
> 
> Thank you~
> Xintong Song
> 
> 
>> On Tue, Feb 2, 2021 at 8:59 PM Randal Pitt  wrote:
>> Hi Xintong Song,
>> 
>> Correct, we are using standalone k8s. Task managers are deployed as a
>> statefulset so have consistent pod names. We tried using native k8s (in fact
>> I'd prefer to) but got persistent
>> "io.fabric8.kubernetes.client.KubernetesClientException: too old resource
>> version: 242214695 (242413759)" errors which resulted in jobs being
>> restarted every 30-60 minutes.
>> 
>> We are using Prometheus Node Exporter to capture memory usage. The graph
>> shows the metric:
>> 
>> sum(container_memory_usage_bytes{container_name="taskmanager",pod_name=~"$flink_task_manager"})
>> by (pod_name)
>> 
>> I've  attached the original
>> 
>>   
>> so Nabble doesn't shrink it.
>> 
>> Best regards,
>> 
>> Randal.
>> 
>> 
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Question on Flink and Rest API

2021-02-02 Thread Ejaskhan S
Team,

It's just a random thought.

Can I make the Flink application exposing a rest endpoint for the data
source? So a client could send data to this endpoint. Subsequently, Flink
processed this data and responded to the client application through the
endpoint, like a client-server model.

Thanks
*EK*


Re: 关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?

2021-02-02 Thread 赵一旦
我看Flink的要求是public,每个属性要么public,要么有getter/setter。估计内嵌的属性也会递归检查的。

℡小新的蜡笔不见嘞、 <1515827...@qq.com> 于2021年2月3日周三 下午1:52写道:

> 你好,我们是否可以通过对该类LinkedHashMap进行包装来实现当前功能呢?如果你需要PojoSerializer来序列化数据的话。
>
>
>
>
> --原始邮件--
> 发件人: "赵一旦" 发送时间: 2021年2月3日(星期三) 中午1:24
> 收件人: "user-zh" 主题: 关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?
>
>
>
> 如题,按照flink对POJO的定义,感觉还是比较严格的。
>
> 我有个类是继承了LinkedHashMap的,就被当作GenericType了。像这种情况,我没办法去修改LinkedHashMap实现,同时也不好不继承。因为我一个实体是动态扩展,不清楚有多少属性的,需要json方式反序列化到Map类型上。


????????????????????POJO??????????????????????????????GenericType????

2021-02-02 Thread ?Y??????????????????
LinkedHashMapPojoSerializer??




----
??: "??"

关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?

2021-02-02 Thread 赵一旦
如题,按照flink对POJO的定义,感觉还是比较严格的。
我有个类是继承了LinkedHashMap的,就被当作GenericType了。像这种情况,我没办法去修改LinkedHashMap实现,同时也不好不继承。因为我一个实体是动态扩展,不清楚有多少属性的,需要json方式反序列化到Map类型上。


PyFlink How to set timeout for UDF

2021-02-02 Thread 苗红宾
Hi:

Hope you are doing well!

My UDF always running in a long time, so I'm wondering, how to set timeout for 
UDF in Pyflink, in order to auto-stop the execution when it running in a long 
time.

Many Thanks!





 





 

Re: 测试pandas udf报错:AttributeError: 'decimal.Decimal' object has no attribute 'isnull'

2021-02-02 Thread Xingbo Huang
Hi,

报错的原因是你函数逻辑实际上是一个aggregate function的语义, 不是scalar function的语义。
scalar function要求的是一进一出,输入输出的数量是保持一致的,pandas
udf只是利用了pandas的batch特性,把数据封装成了一个batch的series给你,但你实际上用还是得保持输入输出数量一致。比如你输入的是pd.Series([1,2,3]),你执行完+1操作之后,结果就是pd.Series([2,3,4]),两个series的长度是保持一致的,都是3。
而对于你这个函数,你实际上是把一整个pd.series的数据聚合成了一个结果,比如输入pd.Series([1,2,3]),你的返回结果就是6,这是多个进,一个出的语义。对于这种情况,你应该使用pandas
udaf。pandas udaf在release-1.12开始支持的,具体可以参考文档[1]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Best,
Xingbo

肖越 <18242988...@163.com> 于2021年2月3日周三 上午11:50写道:

> # 定义计算逻辑函数
>
> @udf(input_types=DataTypes.DECIMAL(38,18,True),
> result_type=DataTypes.DECIMAL(38,18,True), udf_type="pandas")
>
> def multi_production(yldrate):
>
> yldrate_1 = yldrate + 1
>
> return np.prod(yldrate_1) - 1
>
>
> 调用:env.sql_query('SELECT multi_production(YLDRATE) FROM query_result')
> 由于官网并未找到再详细的例子,pandas类型的udf 内部,可以遵循pandas风格处理数据么?
> 【报错信息】:
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:345)
> at
> org.apache.flink.python.AbstractPythonFunctionRunner.finishBundle(AbstractPythonFunctionRunner.java:230)
> ... 17 more
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 2: Traceback (most recent call last):
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
> line 167, in _execute
> response = task()
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
> line 223, in 
> lambda: self.create_worker().do_instruction(request), request)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
> line 352, in do_instruction
> request.instruction_id)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
> line 386, in process_bundle
> bundle_processor.process_bundle(instruction_id))
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
> line 812, in process_bundle
> data.transform_id].process_encoded(data.data)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
> line 205, in process_encoded
> self.output(decoded_value)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py",
> line 304, in output
> cython.cast(Receiver,
> self.receivers[output_index]).receive(windowed_value)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py",
> line 178, in receive
> self.consumer.process(windowed_value)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\operations.py",
> line 92, in process
> self._value_coder_impl.encode_to_stream(self.func(o.value),
> output_stream, True)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py",
> line 467, in encode_to_stream
> self._value_coder.encode_to_stream(value, out, nested)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py",
> line 438, in encode_to_stream
> pandas_to_arrow(self._schema, self._timezone, self._field_types, cols))
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
> line 35, in pandas_to_arrow
> schema.types[i]) for i in range(0, len(schema))]
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
> line 35, in 
> schema.types[i]) for i in range(0, len(schema))]
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
> line 27, in create_array
> return pa.Array.from_pandas(s, mask=s.isnull(), type=t)
> AttributeError: 'decimal.Decimal' object has no attribute 'isnull'
>
>
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
> at
> 

flink clickhouse connector

2021-02-02 Thread 阿华田
使用阿里的clickhouse connector  报找不到factory  大佬们遇到吗  
现在flnk sql写入clickhouse 除了使用阿里的  还有别的方式吗?




org.apache.flink.table.api.ValidationException: Could not find any factory for 
identifier 'clickhouse' that implements 
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Re: Flink job与自己系统平台的一体化集成

2021-02-02 Thread 冯嘉伟
Hi

可以分解为两步:
1、生成JobGraph,可以参考org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils
中 toJobGraph()
2、向Yarn提交JobGraph,可以参考org.apache.flink.yarn.YarnClusterDescriptor 中
deployJobCluster()
注:1.11.x


Jacob wrote
> 有一个模糊的需求,不知道是否合理
> 
> 目前我们的实时计算的Job都是以On Yarn模式运行在hadoop集群,每次提交新的job,都是在Flink客户端下面,用./bin/flink
> run-application -t yarn-application ... 的形式去提交Job。
> 
> 现在我们有自研的一个关于数据处理平台,flink
> job是数据处理的一个环节,想着能不能在我们系统的portal中配一个菜单,上传flink项目的jar包,可以提交Job到hadoop集群,形成一体化的管理,不用每次去一个flink客户端下面去提交了,不知道这种需求是否合理?
> 
> 我想着如果在我们自己的平台上提交job,那是不是应该先把flink客户端先集成到我们的系统中呢,否则job如何被启动运行呢?
> 
> 需求比较模糊,各位大佬见谅。
> 
> 
> 
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

2021-02-02 Thread 赵一旦
@Kezhu Wang  Hi.
最近序列化相关问题遇到好多,如上这个是因为LongAdder非public,这个简单覆盖倒是也能解决。

但是我还遇到好多关于kryo序列化问题,比如我停任务(stop -p)的时候,会在保存点成功的瞬间报错,如何开始进入restarting状态。
报的是kryo的错误:
2021-02-03 11:00:54
com.esotericsoftware.kryo.KryoException: Unable to find class: eU
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(
DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: eU
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.util.FlinkUserCodeClassLoader
.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
at org.apache.flink.util.ChildFirstClassLoader
.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(
FlinkUserCodeClassLoader.java:49)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.flink.runtime.execution.librarycache.
FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(
FlinkUserCodeClassLoaders.java:168)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(
DefaultClassResolver.java:136)
... 22 more

目前看下来比较复杂,想问问你说的通过自定义type-serializer的实现。 我不是很清楚,基于
SimpleVersionedSerializer/avro/protobuf
的实现,哪个是可以通用直接都替换现有flink实现(比如类似json这种schema,不需要人工指定任何schema定义就可以完成序列化和反序列化的),哪个是适用于特殊对象特殊处理的(比如需要人工实现序列化和反序列化)。

以及有什么简单的示例吗。

Kezhu Wang  于2021年1月31日周日 下午3:09写道:

> 自定义 state  的 type-serializer,可以尝试下 SimpleVersionedSerializer/avro/protobuf,
> etc.
>
> 复杂状态应该尽量避免使用依赖反射的 serializer,java Serializable 同理,这些都不好做 state upgrade。
>
> On January 31, 2021 at 11:29:25, 赵一旦 (hinobl...@gmail.com) wrote:
>
> 这个问题有人知道吗?
> 我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。
>
> 毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。
>
>
> 赵一旦  于2021年1月28日周四 下午6:03写道:
>
> > 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
> > 报错堆栈如下,关键错误是什么无法访问public修饰的成员?
> >
> > java.lang.Exception: Exception while creating StreamOperatorStateContext.
> > at org.apache.flink.streaming.api.operators.
> > StreamTaskStateInitializerImpl.streamOperatorStateContext(
> > StreamTaskStateInitializerImpl.java:235)
> > at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> > .initializeState(AbstractStreamOperator.java:248)
> > at org.apache.flink.streaming.runtime.tasks.OperatorChain
> > .initializeStateAndOpenOperators(OperatorChain.java:400)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask
> > .lambda$beforeInvoke$2(StreamTask.java:507)
> > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> > .runThrowing(StreamTaskActionExecutor.java:47)
> > at 

测试pandas udf报错:AttributeError: 'decimal.Decimal' object has no attribute 'isnull'

2021-02-02 Thread 肖越
# 定义计算逻辑函数

@udf(input_types=DataTypes.DECIMAL(38,18,True), 
result_type=DataTypes.DECIMAL(38,18,True), udf_type="pandas")

def multi_production(yldrate):

yldrate_1 = yldrate + 1

return np.prod(yldrate_1) - 1


调用:env.sql_query('SELECT multi_production(YLDRATE) FROM query_result')
由于官网并未找到再详细的例子,pandas类型的udf 内部,可以遵循pandas风格处理数据么?
【报错信息】:
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:345)
at 
org.apache.flink.python.AbstractPythonFunctionRunner.finishBundle(AbstractPythonFunctionRunner.java:230)
... 17 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction 2: Traceback (most recent call last):
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
 line 167, in _execute
response = task()
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
 line 223, in 
lambda: self.create_worker().do_instruction(request), request)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
 line 352, in do_instruction
request.instruction_id)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
 line 386, in process_bundle
bundle_processor.process_bundle(instruction_id))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
 line 812, in process_bundle
data.transform_id].process_encoded(data.data)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
 line 205, in process_encoded
self.output(decoded_value)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py",
 line 304, in output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py",
 line 178, in receive
self.consumer.process(windowed_value)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\operations.py",
 line 92, in process
self._value_coder_impl.encode_to_stream(self.func(o.value), output_stream, 
True)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py",
 line 467, in encode_to_stream
self._value_coder.encode_to_stream(value, out, nested)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py",
 line 438, in encode_to_stream
pandas_to_arrow(self._schema, self._timezone, self._field_types, cols))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
 line 35, in pandas_to_arrow
schema.types[i]) for i in range(0, len(schema))]
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
 line 35, in 
schema.types[i]) for i in range(0, len(schema))]
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
 line 27, in create_array
return pa.Array.from_pandas(s, mask=s.isnull(), type=t)
AttributeError: 'decimal.Decimal' object has no attribute 'isnull'


at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at 

请问在richFunction中如何获取该function输入的元素类型呢?

2021-02-02 Thread 赵一旦
如题,在RichFunction中如何获取输入元素类型。TypeInformation。
目前这部分信息封在transformation中,在function层面貌似没有。
function中需要用到,如果可以获取,可以省略一个传参。


Re:pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 Thread 肖越
抱歉,报错信息理解错误,问题已经解决,感谢大佬。

在 2021-02-03 10:23:32,"肖越" <18242988...@163.com> 写道:
>pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
>结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
>结果print报错:
>Traceback (most recent call last):
>  File "C:*/udtf_test.py", line 42, in 
>env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) 
> FROM query_result')
>  File 
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
>  line 543, in execute_sql
>return TableResult(self._j_tenv.executeSql(stmt))
>  File 
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
>  line 1286, in __call__
>answer, self.gateway_client, self.target_id, self.name)
>  File 
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>  line 154, in deco
>raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
>pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 
>bytes is less than the least required Python worker Memory 79 mb. The Task 
>Off-Heap Memory can be configured using the configuration key 
>'taskmanager.memory.task.off-heap.size'."
>
>
>【代码如下】:
>s_env = StreamExecutionEnvironment.get_execution_environment()
>s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
># s_env.set_parallelism(8)
>env = StreamTableEnvironment.create(s_env,
>
> environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
>env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '0m')
># 注册源表
>env.execute_sql(get_table_ddl('TP_GL_DAY'))
>env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))
>
>
># 注册输出表
>out_ddl = '''
>CREATE TABLE print_result (
> yldrate1 DOUBLE
>) WITH (
> 'connector' = 'print'
>)
>'''
>env.execute_sql(out_ddl)
># 定义及执行SQL
>log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN 
>TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
>view_table = env.sql_query(log_query)
>env.register_table('query_result', view_table)
>
>
># 定义计算逻辑函数
>@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), 
>udf_type="pandas")
>def multi_production(yldrate):
>yldrate_1 = yldrate + 1
>return np.prod(yldrate_1) - 1
>
>
># 注册函数
>env.register_function('multi_production', multi_production)
>env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) 
>FROM query_result')
>query_result.print_schema()
>env.execute('my_udf_job')
>


Re:pyflink1.11 udf计算结果打印问题 The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 Thread 肖越
抱歉,报错信息理解错误,问题已经解决,感谢大佬。







在 2021-02-03 10:16:38,"肖越" <18242988...@163.com> 写道:

pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
结果print报错:
Traceback (most recent call last):
  File "C:*/udtf_test.py", line 42, in 
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) 
FROM query_result')
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
 line 543, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 
bytes is less than the least required Python worker Memory 79 mb. The Task 
Off-Heap Memory can be configured using the configuration key 
'taskmanager.memory.task.off-heap.size'."


【代码如下】:
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# s_env.set_parallelism(8)
env = StreamTableEnvironment.create(s_env,

environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
 '0m')
# 注册源表
env.execute_sql(get_table_ddl('TP_GL_DAY'))
env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))


# 注册输出表
out_ddl = '''
CREATE TABLE print_result (
 yldrate1 DOUBLE
) WITH (
 'connector' = 'print'
)
'''
env.execute_sql(out_ddl)
# 定义及执行SQL
log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN 
TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
view_table = env.sql_query(log_query)
env.register_table('query_result', view_table)


# 定义计算逻辑函数
@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), 
udf_type="pandas")
def multi_production(yldrate):
yldrate_1 = yldrate + 1
return np.prod(yldrate_1) - 1


# 注册函数
env.register_function('multi_production', multi_production)
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM 
query_result')
query_result.print_schema()
env.execute('my_udf_job')





 

Re: pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 Thread Xingbo Huang
Hi,
报错信息说了最少需要79m,我看你代码配成0m,当然还是继续报错呀
Best,
Xingbo

肖越 <18242988...@163.com> 于2021年2月3日周三 上午10:24写道:

> pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
> 结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
> 结果print报错:
> Traceback (most recent call last):
>   File "C:*/udtf_test.py", line 42, in 
> env.execute_sql('INSERT INTO print_result SELECT
> multi_production(YLDRATE) FROM query_result')
>   File
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
> line 543, in execute_sql
> return TableResult(self._j_tenv.executeSql(stmt))
>   File
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
> line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
> line 154, in deco
> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: "The configured Task Off-Heap
> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
> The Task Off-Heap Memory can be configured using the configuration key
> 'taskmanager.memory.task.off-heap.size'."
>
>
> 【代码如下】:
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> # s_env.set_parallelism(8)
> env = StreamTableEnvironment.create(s_env,
>
> environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
> env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '0m')
> # 注册源表
> env.execute_sql(get_table_ddl('TP_GL_DAY'))
> env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))
>
>
> # 注册输出表
> out_ddl = '''
> CREATE TABLE print_result (
>  yldrate1 DOUBLE
> ) WITH (
>  'connector' = 'print'
> )
> '''
> env.execute_sql(out_ddl)
> # 定义及执行SQL
> log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY
> JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
> view_table = env.sql_query(log_query)
> env.register_table('query_result', view_table)
>
>
> # 定义计算逻辑函数
> @udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(),
> udf_type="pandas")
> def multi_production(yldrate):
> yldrate_1 = yldrate + 1
> return np.prod(yldrate_1) - 1
>
>
> # 注册函数
> env.register_function('multi_production', multi_production)
> env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE)
> FROM query_result')
> query_result.print_schema()
> env.execute('my_udf_job')
>
>


Re: LEAD/LAG functions

2021-02-02 Thread Patrick Angeles
Thanks, Jark.

On Mon, Feb 1, 2021 at 11:50 PM Jark Wu  wrote:

> Yes. RANK/ROW_NUMBER is not allowed with ROW/RANGE over window,
> i.e. the "ROWS BETWEEN 1 PRECEDING AND CURRENT ROW" clause.
>
> Best,
> Jark
>
> On Mon, 1 Feb 2021 at 22:06, Timo Walther  wrote:
>
>> Hi Patrick,
>>
>> I could imagine that LEAD/LAG are translated into RANK/ROW_NUMBER
>> operations that are not supported in this context.
>>
>> But I will loop in @Jark who might know more about the limitaitons here.
>>
>> Regards,
>> Timo
>>
>>
>> On 29.01.21 17:37, Patrick Angeles wrote:
>> > Another (hopefully newbie) question. Trying to use LEAD/LAG over window
>> > functions. I get the following error. The exact same query works
>> > properly using FIRST_VALUE instead of LEAD.
>> >
>> > Thanks in advance...
>> >
>> > - Patrick
>> >
>> > Flink SQL> describe l1_min ;
>> >
>> > +---++--+-++---+
>> >
>> > |name | type | null | key | extras | watermark |
>> >
>> > +---++--+-++---+
>> >
>> > |symbol | STRING | true | || |
>> >
>> > | t_start | TIMESTAMP(3) *ROWTIME* | true | || |
>> >
>> > | ask_price | DOUBLE | true | || |
>> >
>> > | bid_price | DOUBLE | true | || |
>> >
>> > | mid_price | DOUBLE | true | || |
>> >
>> > +---++--+-++---+
>> >
>> > 5 rows in set
>> >
>> >
>> > Flink SQL> SELECT
>> >
>> >> symbol,
>> >
>> >> t_start,
>> >
>> >> ask_price,
>> >
>> >> bid_price,
>> >
>> >> mid_price,
>> >
>> >> LEAD (mid_price) OVER x AS prev_price
>> >
>> >> FROM l1_min
>> >
>> >> WINDOW x AS (
>> >
>> >> PARTITION BY symbol
>> >
>> >> ORDER BY t_start
>> >
>> >> ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
>> >
>> >> )
>> >
>> >> ;
>> >
>> > *[ERROR] Could not execute SQL statement. Reason:*
>> >
>> > *org.apache.calcite.sql.validate.SqlValidatorException: ROW/RANGE not
>> > allowed with RANK, DENSE_RANK or ROW_NUMBER functions*
>> >
>>
>>


Re: pyflink连接kerberos的kafka问题

2021-02-02 Thread Wei Zhong
Hi,

第一个问题应该是通过你现在的配置找不到对应的KDC realm, 可以继续尝试使用System.setProperty手动配置, 例如
System.setProperty("java.security.krb5.realm", "");
System.setProperty("java.security.krb5.kdc","”);

第二个问题, 
'update-mode’=‘append'指的是只接受来自上游算子的append消息,而不是写文件时采用append模式。我想你可能想要配置的属性是'format.write-mode’='OVERWRITE’?


> 在 2021年2月2日,21:17,瞿叶奇 <389243...@qq.com> 写道:
> 
> 老师,您好!
> 非常感谢您上次的解答,jaas.conf配置不在报错了,但是出现了新的问题,程序如下:
> #!/usr/bin/python3.7
> # -*- coding: UTF-8 -*-
> from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, 
> CsvTableSink, WriteMode, SqlDialect
> from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
> from pyflink.java_gateway import get_gateway
> System = get_gateway().jvm.System
> System.setProperty("java.security.auth.login.config", 
> "/opt/client2/Flink/flink/conf/jaas.conf")
> System.setProperty("java.security.krb5.conf", "/root/qyq_user/krb5.conf ")
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_parallelism(1)
> s_env.enable_checkpointing(3)
> st_env = StreamTableEnvironment.create(s_env, TableConfig())
> st_env.use_catalog("default_catalog")
> st_env.use_database("default_database")
> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
>  
> "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol",
>  'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 
> 'kafka').property("kerberos.domain.name", 
> 'hadoop.hadoop.com').property('group.id' 
> ,'example-group1').property("bootstrap.servers", 
> "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
>  DataTypes.BIGINT()),DataTypes.FIELD("name", 
> DataTypes.STRING())]))).with_schema(Schema().field("id", 
> DataTypes.BIGINT()).field("name", 
> DataTypes.STRING())).create_temporary_table("sourceKafka")
> fieldNames = ["id", "name"]
> fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
> csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, 
> WriteMode.OVERWRITE)
> st_env.register_table_sink("csvTableSink", csvSink)
> resultQuery = st_env.sql_query("select id,name from sourceKafka")
> resultQuery = resultQuery.insert_into("csvTableSink")
> st_env.execute("pyflink-kafka-v4")
> 新问题1):
> 
> 我百度了一下,也没有什么类似能够解决的案例。
> 新问题2):
> 创建hdfs的sink表后只能写一次就报错了,(文件已存在的错误)错误如下:
> <619c3...@682a4270.ce501960.jpg>
> 建表如下:
> CREATE TABLE csvTableSink ( id BIGINT,name STRING) WITH ('connector.path'= 
> 'hdfs://hacluster/flink/qyq_qyq13','connector.type'='filesystem','format.type'='csv','update-mode'
>  = 'append')
> 我想问一下,我需要改动,才能保证新数据追加到文件呢?
> 
> 



Re: Question

2021-02-02 Thread Abu Bakar Siddiqur Rahman Rocky
Hi,

Is there any source code for the checkpoints, snapshot and zookeeper
mechanism?

Thank you

On Mon, Feb 1, 2021 at 4:23 AM Chesnay Schepler  wrote:

> Could you expand a bit on what you mean? Are you referring to *savepoints*
> ?
>
> On 1/28/2021 3:24 PM, Abu Bakar Siddiqur Rahman Rocky wrote:
>
> Hi,
>
> Is there any library to use and remember the apache flink snapshot?
>
> Thank you
>
> --
> Regards,
> Abu Bakar Siddiqur Rahman
>
>
>
>

-- 
Regards,
Abu Bakar Siddiqur Rahman
Graduate Research Student
Natural Language Processing Laboratory
Centro de Investigacion en Computacion
Instituto Politecnico Nacional, Mexico City


pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 Thread 肖越
pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
结果print报错:
Traceback (most recent call last):
  File "C:*/udtf_test.py", line 42, in 
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) 
FROM query_result')
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
 line 543, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 
bytes is less than the least required Python worker Memory 79 mb. The Task 
Off-Heap Memory can be configured using the configuration key 
'taskmanager.memory.task.off-heap.size'."


【代码如下】:
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# s_env.set_parallelism(8)
env = StreamTableEnvironment.create(s_env,

environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
 '0m')
# 注册源表
env.execute_sql(get_table_ddl('TP_GL_DAY'))
env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))


# 注册输出表
out_ddl = '''
CREATE TABLE print_result (
 yldrate1 DOUBLE
) WITH (
 'connector' = 'print'
)
'''
env.execute_sql(out_ddl)
# 定义及执行SQL
log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN 
TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
view_table = env.sql_query(log_query)
env.register_table('query_result', view_table)


# 定义计算逻辑函数
@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), 
udf_type="pandas")
def multi_production(yldrate):
yldrate_1 = yldrate + 1
return np.prod(yldrate_1) - 1


# 注册函数
env.register_function('multi_production', multi_production)
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM 
query_result')
query_result.print_schema()
env.execute('my_udf_job')



Re: Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-02 Thread Xintong Song
>
> How is the memory measured?

I meant which flink or k8s metric is collected? I'm asking because
depending on which metric is used, the *container memory usage* can be
defined differently. E.g., whether mmap memory is included.

Also, could you share the effective memory configurations for the
taskmanagers? You should find something like the following at the
beginning of taskmanger logs.

INFO  [] - Final TaskExecutor Memory configuration:
> INFO  [] -   Total Process Memory:  1.688gb (1811939328 bytes)
> INFO  [] - Total Flink Memory:  1.250gb (1342177280 bytes)
> INFO  [] -   Total JVM Heap Memory: 512.000mb (536870902 bytes)
> INFO  [] - Framework:   128.000mb (134217728 bytes)
> INFO  [] - Task:384.000mb (402653174 bytes)
> INFO  [] -   Total Off-heap Memory: 768.000mb (805306378 bytes)
> INFO  [] - Managed: 512.000mb (536870920 bytes)
> INFO  [] - Total JVM Direct Memory: 256.000mb (268435458 bytes)
> INFO  [] -   Framework: 128.000mb (134217728 bytes)
> INFO  [] -   Task:  0 bytes
> INFO  [] -   Network:   128.000mb (134217730 bytes)
> INFO  [] - JVM Metaspace:   256.000mb (268435456 bytes)
> INFO  [] - JVM Overhead:192.000mb (201326592 bytes)


Thank you~

Xintong Song



On Tue, Feb 2, 2021 at 8:59 PM Randal Pitt  wrote:

> Hi Xintong Song,
>
> Correct, we are using standalone k8s. Task managers are deployed as a
> statefulset so have consistent pod names. We tried using native k8s (in
> fact
> I'd prefer to) but got persistent
> "io.fabric8.kubernetes.client.KubernetesClientException: too old resource
> version: 242214695 (242413759)" errors which resulted in jobs being
> restarted every 30-60 minutes.
>
> We are using Prometheus Node Exporter to capture memory usage. The graph
> shows the metric:
>
>
> sum(container_memory_usage_bytes{container_name="taskmanager",pod_name=~"$flink_task_manager"})
> by (pod_name)
>
> I've  attached the original
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2869/Screenshot_2021-02-02_at_11.png>
>
> so Nabble doesn't shrink it.
>
> Best regards,
>
> Randal.
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: flink 1.12 中如何读取 mysql datetime 字段

2021-02-02 Thread macdoor
我通过实验确认这是升级 MySql JDBC Driver 8.0.23 造成的,回到 MySql JDBC Driver
8.0.22,就没有问题,我提交了 issue

https://issues.apache.org/jira/browse/FLINK-21240



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Question a possible use can for Iterative Streams.

2021-02-02 Thread Marco Villalobos
Hi everybody,

I am brainstorming how it might be possible to perform database enrichment
with the DataStream API, use keyed state for caching, and also utilize
Async IO.

Since AsyncIO does not support keyed state, then is it possible to use an
Iterative Stream that uses keyed state for caching in the main body, and
uses feedback to fetch cache misses with AsyncIO?

I hope this diagram conveys my idea.

[image: image.png]

I am thinking of leveraging an Iterative Stream in this manner, but I am
not quite sure how Iterative Steams work since the example is not clear to
me.


Re: Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-02-02 Thread Sebastián Magrí
The root of the previous error seemed to be the flink version the connector
was compiled for. I've tried compiling my own postgresql-cdc connector, but
still have some issues with dependencies.

On Thu, 28 Jan 2021 at 11:24, Sebastián Magrí  wrote:

> Applied that parameter and that seems to get me some progress here.
>
> I still get the shade overlapping classes warning, but I get the
> PostgreSQLTableFactory in the merged table.factories.Factory service file.
>
> However, now on runtime the application fails to find the debezium source
> function class coming down to this error:
>
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot load user class:
> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction
> ClassLoader info: URL ClassLoader:
> Class not resolvable through given classloader.
>
> The class is indeed in jar, though.
>
> Any thougths?
>
> On Thu, 28 Jan 2021 at 09:57, Jark Wu  wrote:
>
>> Hi Sebastián,
>>
>> Could you try to add combine.children="append" attribute to the
>> transformers configuration?
>> You can also see the full shade plugin configuration here [1].
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/#transform-table-connectorformat-resources
>>
>> On Thu, 28 Jan 2021 at 17:28, Sebastián Magrí 
>> wrote:
>>
>>> Hi Jark!
>>>
>>> Please find the full pom file attached.
>>>
>>> Best Regards,
>>>
>>> On Thu, 28 Jan 2021 at 03:21, Jark Wu  wrote:
>>>
 Hi Sebastián,

 I think Dawid is right.

 Could you share the pom file? I also tried to
 package flink-connector-postgres-cdc with ServicesResourceTransformer, and
 the Factory file contains

 com.alibaba.ververica.cdc.connectors.postgres.table.PostgreSQLTableFactory


 Best,
 Jark


 On Tue, 26 Jan 2021 at 21:17, Sebastián Magrí 
 wrote:

> Thanks a lot for looking into it Dawid,
>
> In the
> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> file I only see
>
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
>
> Even after applying the ServicesResourceTransformer.
>
>
> On Tue, 26 Jan 2021 at 11:58, Dawid Wysakowicz 
> wrote:
>
>> Hi,
>>
>> Unfortunately I am not familiar with the packaging of
>> flink-connector-postgres-cdc. Maybe @Jark could help here?
>>
>> However, I think the problem that you cannot find the connector is
>> caused because of lack of entry in the resulting Manifest file. If there
>> are overlapping classes maven does not exclude whole dependencies, but
>> rather picks the overlapping class from one of the two. Could you check 
>> if
>> you see entries for all tables in
>> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.
>>
>> If not, you could try applying the ServicesResourceTransformer[1]
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
>> On 26/01/2021 12:29, Sebastián Magrí wrote:
>>
>> Hi!
>>
>> I've reported an issue with the postgresql-cdc connector apparently
>> caused by the maven shade plugin excluding either the JDBC connector or 
>> the
>> cdc connector due to overlapping classes. The issue for reference is 
>> here:
>>
>> https://github.com/ververica/flink-cdc-connectors/issues/90
>>
>> In the meantime, however, I've been trying to figure out if I can set
>> up an exclusion rule to fix this in my pom.xml file, without success.
>>
>> The `org.postgresql:postgresql` dependency is being added manually by
>> me to have a sink on a postgresql table and injected by the cdc connector
>> seemingly via its debezium connector dependency.
>>
>> Any guidance or hints I could follow would be really appreciated.
>>
>> --
>> Sebastián Ramírez Magrí
>>
>>
>
> --
> Sebastián Ramírez Magrí
>

>>>
>>> --
>>> Sebastián Ramírez Magrí
>>>
>>
>
> --
> Sebastián Ramírez Magrí
>


-- 
Sebastián Ramírez Magrí


Re: [Flink SQL] CompilerFactory cannot be cast error when executing statement

2021-02-02 Thread Sebastián Magrí
Hi Timo!

I've been building my jobs instead of using the binaries to avoid this
issue, hence I've not looked at this again. But I'd say it's still an issue
since nothing from the set up have changed in the meantime.

Thanks!

On Tue, 2 Feb 2021 at 08:51, Timo Walther  wrote:

> Hi Sebastian,
>
> sorry for the late reply. Could you solve the problem in the meantime?
> It definitely looks like a dependency conflict.
>
> Regards,
> Timo
>
>
> On 22.01.21 18:18, Sebastián Magrí wrote:
> > Thanks a lot Matthias!
> >
> > In the meantime I'm trying out something with the scala quickstart.
> >
> >
> > On Fri, 22 Jan 2021 at 17:12, Matthias Pohl  > > wrote:
> >
> > Ok, to be fair, I just did some research on the error message and
> > didn't realize that you're working with binaries only.
> >
> > I tried to set it up on my machine to be able to reproduce your
> > error. Unfortunately, I wasn't able to establish the connection
> > between Flink and Postgres using your docker-compose.yml.
> > I'm going to cc Timo. Maybe, he has a guess what's causing this
> error.
> >
> > Best,
> > Matthias
> >
> > On Fri, Jan 22, 2021 at 4:35 PM Sebastián Magrí
> > mailto:sebasma...@gmail.com>> wrote:
> >
> > Hi Matthias!
> >
> > I went through that thread but as I'm just using the
> > `apache/flink` docker image for testing I honestly couldn't
> > figure out how I would do that since I don't have a pom file to
> > edit. If it's possible to do it through the configuration I'd be
> > glad if you could point me out in the right direction.
> >
> > Pretty evident I don't have a lot of experience with mvn or
> > "modern" Java in general.
> >
> > :-)
> >
> > Thanks!
> >
> > On Fri, 22 Jan 2021 at 15:19, Matthias Pohl
> > mailto:matth...@ververica.com>> wrote:
> >
> > Hi Sebastián,
> > have you tried changing the dependency scope to provided
> > for flink-table-planner-blink as it is suggested in [1]?
> >
> > Best,
> > Matthias
> >
> > [1]
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-10-exception-Unable-to-instantiate-java-compiler-td38221.html
> > <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-10-exception-Unable-to-instantiate-java-compiler-td38221.html
> >
> >
> > On Fri, Jan 22, 2021 at 4:04 PM Sebastián Magrí
> > mailto:sebasma...@gmail.com>> wrote:
> >
> > Hi!
> >
> > I'm trying out Flink SQL with the attached
> > docker-compose file.
> >
> > It starts up and then I create a table with the
> > following statement:
> >
> > CREATE TABLE mytable_simple (
> >`customer_id` INT
> > ) WITH (
> >'connector' = 'jdbc',
> >'url' = 'jdbc:postgresql://pgusr:pgpwd@postgres
> /pdgb',
> >'table-name' = 'mytable'
> > );
> >
> > However when I try to run this:
> >
> > select * from mytable_simple;
> >
> > I get the following error in the client:
> >
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.ClassCastException:
> > org.codehaus.janino.CompilerFactory cannot be cast to
> > org.codehaus.commons.compiler.ICompilerFactory
> >
> > At first I thought it could be an incompatibility issue
> > with the libraries I was putting in, like the
> > postgres-cdc library version, but even after leaving
> > only the JDBC libraries in I still get the same error.
> >
> > It'd be great if you could give me some pointers here.
> >
> > Thanks!
> >
> > --
> > Sebastián Ramírez Magrí
> >
> >
> >
> > --
> > Sebastián Ramírez Magrí
> >
> >
> > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl
> > Anton Wehner
> >
> >
> >
> > --
> > Sebastián Ramírez Magrí
>
>

-- 
Sebastián Ramírez Magrí


Statefun: cancel "sendAfter"

2021-02-02 Thread Stephan Pelikan
Hi,

I think about using "sendAfter" to implement some kind of timer functionality. 
I'm wondering if there is no possibility to cancel delayed sent message!

In my use case it is possible that intermediate events make the delayed message 
obsolete. In some cases the statefun of that certain ID is cleared (clear all 
state variables) and does not exist anymore. In other cases the statefun of 
that ID still exists (and its state). In the latter case I could ignore the 
delayed message, but what about those statefun which do not exist anymore?

Additionally there can be millions of delayed messages which I do not need any 
more and some delays are also hours, days or even months. I don't want to 
pollute my state with this because it will inflate the size of my checkpoints.

There are no hints in the docs 
(https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/java.html#sending-delayed-messages)
 how those situations are treated. I found in the Flink's docs that timers of 
keyed processors can be deleted. As far as I know statefuns are based on those 
processors, so I hope that there is something about it. I hope someone can 
clarify what I can expect and how those situations are handled internally.

Thanks,
Stephan


Re: Flink Datadog Timeout

2021-02-02 Thread Chesnay Schepler
The reported exception looks quite similar to the one in this thread 
, 
which was supposedly caused by Datadog rate limits but I don't think 
this was thoroughly investigated.
(bear in mind that each container has its own reporter; with the default 
reporting interval of 10 seconds you quickly reach fairly high 
reports/second rates)


Alternatively it could just be plain connectivity issues.

If the issues do not persist for a long time then no metrics /should /be 
lost however, so you may be able to ignore them.



On 2/2/2021 7:31 PM, Claude M wrote:


Hello,

I have a Flink jobmanager and taskmanagers deployed in a Kubernetes 
cluster.  I integrated it with Datadog by having the following 
specified in the flink-conf.yaml.


metrics.reporter.dghttp.class: 
org.apache.flink.metrics.datadog.DatadogHttpReporter

metrics.reporter.dghttp.apikey: 

However, I'm seeing random timeouts in the log and don't know why this 
is occurring and how to solve the issue.  Please see attached file 
showing the error.



Thanks








Flink Datadog Timeout

2021-02-02 Thread Claude M
Hello,

I have a Flink jobmanager and taskmanagers deployed in a Kubernetes
cluster.  I integrated it with Datadog by having the following specified in
the flink-conf.yaml.

metrics.reporter.dghttp.class:
org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: 

However, I'm seeing random timeouts in the log and don't know why this is
occurring and how to solve the issue.   Please see attached file showing
the error.


Thanks
WARN  org.apache.flink.metrics.datadog.DatadogHttpClient - Failed sending 
request to Datadog
java.net.SocketTimeoutException: timeout
at 
org.apache.flink.shaded.okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593)
at 
org.apache.flink.shaded.okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601)
at 
org.apache.flink.shaded.okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:146)
at 
org.apache.flink.shaded.okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:120)
at 
org.apache.flink.shaded.okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
at 
org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
org.apache.flink.shaded.okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
at 
org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at 
org.apache.flink.shaded.okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at 
org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at 
org.apache.flink.shaded.okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at 
org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
org.apache.flink.shaded.okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
at 
org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at 
org.apache.flink.shaded.okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
at 
org.apache.flink.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
at 
org.apache.flink.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Re: Very slow recovery from Savepoint

2021-02-02 Thread Robert Metzger
Hey Yordan,

have you checked the log files from the processes in that cluster?
The JobManager log should give you hints about issues with the coordination
/ scheduling of the job. Could it be something unexpected, like your job
could not start, because there were not enough TaskManagers available?
The TaskManager logs could give you also hints about potential retries etc.

What you could also do is manually sample the TaskManagers (you can access
thread dumps via the web ui) to see what they are doing.

Hope this helps!

On Thu, Jan 28, 2021 at 5:42 PM Yordan Pavlov  wrote:

> Hello there,
> I am trying to find the solution for a problem we are having in our Flink
> setup related to very slow recovery from a Savepoint. I have searched in
> the
> mailing list, found a somewhat similar problem, the bottleneck there was
> the
> HD usage, but I am not seeing this in our case. Here is a description of
> what our setup is:
> * Flink 1.11.3
> * Running on top of Kubernetes on dedicated hardware.
> * The Flink job consists of 4 task manager running on separate Kubernetes
> pods along with a Jobmanager also running on separate Pod.
> * We use RocksDB state backend with incremental checkpointing.
> * The size of the savepoint I try to recover is around 35 GB
> * The file system that RocksDB uses is S3, or more precisely a S3
> emulation (Minio), we are not subject to any EBS burst credits and so
> on.
>
> The time it takes for the Flink job to be operational and start consuming
> new records is around 5 hours. During that time I am not seeing any heavy
> resource usage on any of the TaskManager pods. I am attaching a
> screenshot of the resources of one of the Taskmanager pods.
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2957/Flink-pod-start.png
> >
>
> In this graph the job was started at around 14:00 o'clock. There is this
> huge spike shortly after this and then there is not much happening. This
> goes on for around 5 hours after which the job starts, but again working
> quite slowly. What would be the way to profile where the bottleneck
> is? I have checked my network connectivity and I am able to download
> the whole savepoint for several minutes manually. It seems like Flink
> is very slow to build its internal state but then again the CPU is not
> being utilized. I would be grateful for any suggestions on how to
> proceed with this investigation.
>
> Regards,
> Yordan
>


Max with retract aggregate function does not support type: ''CHAR''.

2021-02-02 Thread Yuval Itzchakov
Hi,
I'm trying to use MAX on a field that is statically known from another
table (let's ignore why for a moment). While running the SQL query, I
receive an error:

Max with retract aggregate function does not support type: ''CHAR''.

Looking at the code for creating the max function:

[image: image.png]

It does seem like all primitives are supported. Is there a particular
reason why a CHAR would not be supported? Is this an oversight?
-- 
Best Regards,
Yuval Itzchakov.


Re: Dynamic statefun topologies

2021-02-02 Thread Igal Shilman
Hi Frédérique!

Thank you for your kind words! let me try to answer your questions:

>From the email thread, it looks like there’s going to be support for
> dynamic function dispatch by name patterns which is pretty cool, but it
> sounds like you still need to redeploy if you add a new ingress or egress.
> Is that correct?
>
This is correct.

Are there plans to support such a dynamic use case? Or is there already a
> way to achieve this that I’m not aware of?
>
There are no plans to support that at the moment, but we would be very
happy to learn more about your use case, and possibly re-prioritize
this.

For now, we’re considering to generate the yaml dynamically and whenever a
> change is necessary, restart Flink with the new config. We can create a
> savepoint before teardown and resume from it after restart, but that adds
> quite a bit of complexity and potential points of failure. Would this
> approach be a recommended method for achieving this type of dynamic config
> right now?
>
Your suggestion can definitely work, and I think that it really depends on
what you are trying to achieve.
For example: are you trying to add/remove topics to consume from Kafka? or
are you trying to add completely different ingresses? or possibly the
topics that you consume from, do not change, but you would like to change
the routing dynamically?


>
> Alternatively, I also saw that you can deploy jars to the Flink cluster,
> but the code samples all seem to be for JVM functions. Is it possible to
> submit remote function jobs as jars to Flink? If so, how do you do that /
> do you have a link to an example?
>
This is possible with remote functions as-well, as long as the module.yaml
can be found in the classpath, or alternatively using the datastream
integration[1].

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java#L79,L83

Kind regards,
Igal.


On Tue, Feb 2, 2021 at 11:52 AM Frédérique Mittelstaedt 
wrote:

> Hi!
>
> Thanks for all the great work on both Flink and Statefun. I saw this
> recent email thread (
> https://lists.apache.org/thread.html/re984157869f5efd136cda9d679889e6ba2f132213ae7afff715783e2%40%3Cuser.flink.apache.org%3E
> )
> and we’re looking at a similar problem.
>
> Right now, statefun requires you to specify a yaml config upon start-up
> that sets up all the bindings. If you want to change the config, you
> effectively need to restart your Flink instance.
>
> We’re looking to use Flink statefun with dynamic topologies, i.e. based on
> a config change in another system, we want to create/update/delete
> bindings.
>
> From the email thread, it looks like there’s going to be support for
> dynamic function dispatch by name patterns which is pretty cool, but it
> sounds like you still need to redeploy if you add a new ingress or egress.
> Is that correct?
>
> Are there plans to support such a dynamic use case? Or is there already a
> way to achieve this that I’m not aware of?
>
> For now, we’re considering to generate the yaml dynamically and whenever a
> change is necessary, restart Flink with the new config. We can create a
> savepoint before teardown and resume from it after restart, but that adds
> quite a bit of complexity and potential points of failure. Would this
> approach be a recommended method for achieving this type of dynamic config
> right now?
>
> Alternatively, I also saw that you can deploy jars to the Flink cluster,
> but the code samples all seem to be for JVM functions. Is it possible to
> submit remote function jobs as jars to Flink? If so, how do you do that /
> do you have a link to an example?
>
> Thanks a lot for your help & all the best,
> Frédérique
>
>


Flink cli to upload flink jar and run flink jobs in the Jenkins pipeline

2021-02-02 Thread sidhant gupta
Hi

I have a flink ECS cluster setup with HA mode using zookeeper where I have
2 jobmanagers out of which one of will be elected as leader using zookeeper
leader election. I have one application load balancer in front of the
jobmanagers and one network load balancer in front of zookeeper.

As per [1]

,
we can provide zookeeper address in the flink cli arguments and it would
upload/ submit the jar to the leader jobmanager. But since I am using
network load balancer in front of zookeeper, I guess it is not able to make
connection with the zookeeper. Please provide suggestions or sample command
for uploading the flink job jar or run the job.

Is  there any way by which we can distinguish between leader and standby
jobmanagers in terms of request or response ?

Can we use flink cli in jenkins to upload the jar to the flink cluster and
run the jobs?


[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html

Thanks
Sidhant Gupta


Re: 关于kafka中csv格式数据字段分隔符请教

2021-02-02 Thread JasonLee
hi

1,11 也是支持的



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


AbstractMethodError while writing to parquet

2021-02-02 Thread Jan Oelschlegel
Hi at all,

i'm using Flink 1.11 with the datastream api. I would like to write my results 
in parquet format into HDFS.

Therefore i generated an Avro SpecificRecord with the avro-maven-plugin:




org.apache.avro
avro-maven-plugin
1.8.2


generate-sources

schema



src/main/resources/avro/

${project.basedir}/target/generated-sources/
String






Then  I'm using the SpecificRecord in the StreamingFileSink:



val sink: StreamingFileSink[SpecificRecord] = StreamingFileSink
  .forBulkFormat(
new Path("hdfs://example.com:8020/data/"),
ParquetAvroWriters.forSpecificRecord(classOf[SpecificRecord])
  )
  .build()


The job cancels with the following error:


java.lang.AbstractMethodError: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg/apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/Statistics;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;)V
at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:53)
at 
org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:315)
at 
org.apache.parquet.column.impl.ColumnWriteStoreBase.sizeCheck(ColumnWriteStoreBase.java:200)
at 
org.apache.parquet.column.impl.ColumnWriteStoreBase.endRecord(ColumnWriteStoreBase.java:187)
at 
org.apache.parquet.column.impl.ColumnWriteStoreV1.endRecord(ColumnWriteStoreV1.java:27)
at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:307)
at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:166)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
at 
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:202)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at 
de.integration_factory.datastream.aggregations.CasesProcessFunction.process(CasesProcessFunction.scala:34)
at 
de.integration_factory.datastream.aggregations.CasesProcessFunction.process(CasesProcessFunction.scala:11)
at 
org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
at 

Re: Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-02 Thread Randal Pitt
Hi Xintong Song,

Correct, we are using standalone k8s. Task managers are deployed as a
statefulset so have consistent pod names. We tried using native k8s (in fact
I'd prefer to) but got persistent
"io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 242214695 (242413759)" errors which resulted in jobs being
restarted every 30-60 minutes.

We are using Prometheus Node Exporter to capture memory usage. The graph
shows the metric:

sum(container_memory_usage_bytes{container_name="taskmanager",pod_name=~"$flink_task_manager"})
by (pod_name)

I've  attached the original

  
so Nabble doesn't shrink it.

Best regards,

Randal.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


stop -p停并生产保存点,报错kyro错误。

2021-02-02 Thread 赵一旦
如题,报错如下。
2021-02-02 20:44:19
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
95
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
近期遇到过不少跟序列化有关的错误。

想问下,(1)如上错误原因。(2)关于自定义序列化器,有什么类似于POJO序列化器一样的不需要用户指定schema的序列化器吗?
比如只需要注册一个实现类,不需要定义任何schema的。


Re: Integration with Apache AirFlow

2021-02-02 Thread Flavio Pompermaier
You're probably right Chesnay, I just asked to this mailing list to know if
there are any pointers or blog post about this topic from a Flink
perspective.
Then the conversation has gone in the wrong direction.

Best,
Flavio

On Tue, Feb 2, 2021 at 12:48 PM Chesnay Schepler  wrote:

> I'm sorry, but aren't these question better suited for the Airflow mailing
> lists?
>
> On 2/2/2021 12:35 PM, Flavio Pompermaier wrote:
>
> Thank you all for the hints. However looking at the REST API[1] of AirFlow
> 2.0 I can't find how to setup my DAG (if this is the right concept).
> Do I need to first create a Connection? A DAG?  a TaskInstance? How do I
> specify the 2 BashOperator?
> I was thinking to connect to AirFlow via Java so I can't use the Python
> API..
>
> [1]
> https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#section/Overview
>
> On Tue, Feb 2, 2021 at 10:53 AM Arvid Heise  wrote:
>
>> Hi Flavio,
>>
>> If you know a bit of Python, it's also trivial to add a new Flink
>> operator where you can use REST API.
>>
>> In general, I'd consider Airflow to be the best choice for your problem,
>> especially if it gets more complicated in the future (do something else if
>> the first job fails).
>>
>> If you have specific questions, feel free to ask.
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Feb 2, 2021 at 10:08 AM 姜鑫  wrote:
>>
>>> Hi Flavio,
>>>
>>> I probably understand what you need. Apache AirFlow is a scheduling
>>> framework which you can define your own dependent operators, therefore you
>>> can define a BashOperator to submit flink job to you local flink cluster.
>>> For example:
>>> ```
>>> t1 = BashOperator(
>>> task_id=‘flink-wordcount',
>>> bash_command=‘./bin/flink run
>>> flink/build-target/examples/batch/WordCount.jar',
>>> ...
>>> )
>>> ```
>>> Alse Airflow supports submitting jobs to kubernetes and you can even
>>> implement your own operator if bash command doesn’t meet your demands.
>>>
>>> Indeed Flink AI (flink-ai-extended
>>>  ?) needs an enhanced
>>> version of AirFlow, but it is mainly for streaming scenario which means the
>>> job won’t stop. In your case which are all batch jobs it doesn’t help much.
>>> Hope this helps.
>>>
>>> Regard,
>>> Xin
>>>
>>>
>>> 2021年2月2日 下午4:30,Flavio Pompermaier  写道:
>>>
>>> Hi Xin,
>>> let me state first that I never used AirFlow so I can probably miss some
>>> background here.
>>> I just want to externalize the job scheduling to some consolidated
>>> framework and from what I see Apache AirFlow is probably what I need.
>>> However I can't find any good blog post or documentation about how to
>>> integrate these 2 technologies using REST API of both services.
>>> I saw that Flink AI decided to use a customized/enhanced version of
>>> AirFlow [1] but I didn't look into the code to understand how they use it.
>>> In my use case I just want to schedule 2 Flink batch jobs using the REST
>>> API of AirFlow, where the second one is fired after the first.
>>>
>>> [1]
>>> https://github.com/alibaba/flink-ai-extended/tree/master/flink-ai-flow
>>>
>>> Best,
>>> Flavio
>>>
>>> On Tue, Feb 2, 2021 at 2:43 AM 姜鑫  wrote:
>>>
 Hi Flavio,

 Could you explain what your direct question is? In my opinion, it is
 possible to define two airflow operators to submit dependent flink job, as
 long as the first one can reach the end.

 Regards,
 Xin

 2021年2月1日 下午6:43,Flavio Pompermaier  写道:

 Any advice here?

 On Wed, Jan 27, 2021 at 9:49 PM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Hello everybody,
> is there any suggested way/pointer to schedule Flink jobs using Apache
> AirFlow?
> What I'd like to achieve is the submission (using the REST API of
> AirFlow) of 2 jobs, where the second one can be executed only if the first
> one succeed.
>
> Thanks in advance
> Flavio
>

>>>
>
>


flink1.10??flink-sql,pyflink????hdfs??sink??????????append????????????????????????

2021-02-02 Thread ??????
??
 ??hdfs??sink??

??
CREATE TABLE csvTableSink ( id BIGINT,name STRING) WITH ('connector.path'= 
'hdfs://hacluster/flink/qyq_qyq13','connector.type'='filesystem','format.type'='csv','update-mode'
 = 'append')


Re: Integration with Apache AirFlow

2021-02-02 Thread Chesnay Schepler
I'm sorry, but aren't these question better suited for the Airflow 
mailing lists?


On 2/2/2021 12:35 PM, Flavio Pompermaier wrote:
Thank you all for the hints. However looking at the REST API[1] of 
AirFlow 2.0 I can't find how to setup my DAG (if this is the right 
concept).
Do I need to first create a Connection? A DAG?  a TaskInstance? How do 
I specify the 2 BashOperator?
I was thinking to connect to AirFlow via Java so I can't use the 
Python API..


[1] 
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#section/Overview 



On Tue, Feb 2, 2021 at 10:53 AM Arvid Heise > wrote:


Hi Flavio,

If you know a bit of Python, it's also trivial to add a new Flink
operator where you can use REST API.

In general, I'd consider Airflow to be the best choice for your
problem, especially if it gets more complicated in the future (do
something else if the first job fails).

If you have specific questions, feel free to ask.

Best,

Arvid

On Tue, Feb 2, 2021 at 10:08 AM 姜鑫 mailto:jiangxin...@gmail.com>> wrote:

Hi Flavio,

I probably understand what you need. Apache AirFlow is a
scheduling framework which you can define your own dependent
operators, therefore you can define a BashOperator to submit
flink job to you local flink cluster. For example:
```
t1 = BashOperator(
    task_id=‘flink-wordcount',
    bash_command=‘./bin/flink run
flink/build-target/examples/batch/WordCount.jar',
    ...
)
```
Alse Airflow supports submitting jobs to kubernetes and you
can even implement your own operator if bash command doesn’t
meet your demands.

Indeed Flink AI (flink-ai-extended
 ?) needs an
enhanced version of AirFlow, but it is mainly for streaming
scenario which means the job won’t stop. In your case which
are all batch jobs it doesn’t help much. Hope this helps.

Regard,
Xin



2021年2月2日 下午4:30,Flavio Pompermaier mailto:pomperma...@okkam.it>> 写道:

Hi Xin,
let me state first that I never used AirFlow so I can
probably miss some background here.
I just want to externalize the job scheduling to some
consolidated framework and from what I see Apache AirFlow is
probably what I need.
However I can't find any good blog post or documentation
about how to integrate these 2 technologies using REST API of
both services.
I saw that Flink AI decided to use a customized/enhanced
version of AirFlow [1] but I didn't look into the code to
understand how they use it.
In my use case I just want to schedule 2 Flink batch jobs
using the REST API of AirFlow, where the second one is fired
after the first.

[1]
https://github.com/alibaba/flink-ai-extended/tree/master/flink-ai-flow


Best,
Flavio

On Tue, Feb 2, 2021 at 2:43 AM 姜鑫 mailto:jiangxin...@gmail.com>> wrote:

Hi Flavio,

Could you explain what your direct question is? In my
opinion, it is possible to define two airflow operators
to submit dependent flink job, as long as the first one
can reach the end.

Regards,
Xin


2021年2月1日 下午6:43,Flavio Pompermaier
mailto:pomperma...@okkam.it>> 写道:

Any advice here?

On Wed, Jan 27, 2021 at 9:49 PM Flavio Pompermaier
mailto:pomperma...@okkam.it>> wrote:

Hello everybody,
is there any suggested way/pointer to schedule Flink
jobs using Apache AirFlow?
What I'd like to achieve is the submission (using
the REST API of AirFlow) of 2 jobs, where the second
one can be executed only if the first one succeed.

Thanks in advance
Flavio









Re: Integration with Apache AirFlow

2021-02-02 Thread Flavio Pompermaier
Thank you all for the hints. However looking at the REST API[1] of AirFlow
2.0 I can't find how to setup my DAG (if this is the right concept).
Do I need to first create a Connection? A DAG?  a TaskInstance? How do I
specify the 2 BashOperator?
I was thinking to connect to AirFlow via Java so I can't use the Python
API..

[1]
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#section/Overview

On Tue, Feb 2, 2021 at 10:53 AM Arvid Heise  wrote:

> Hi Flavio,
>
> If you know a bit of Python, it's also trivial to add a new Flink operator
> where you can use REST API.
>
> In general, I'd consider Airflow to be the best choice for your problem,
> especially if it gets more complicated in the future (do something else if
> the first job fails).
>
> If you have specific questions, feel free to ask.
>
> Best,
>
> Arvid
>
> On Tue, Feb 2, 2021 at 10:08 AM 姜鑫  wrote:
>
>> Hi Flavio,
>>
>> I probably understand what you need. Apache AirFlow is a scheduling
>> framework which you can define your own dependent operators, therefore you
>> can define a BashOperator to submit flink job to you local flink cluster.
>> For example:
>> ```
>> t1 = BashOperator(
>> task_id=‘flink-wordcount',
>> bash_command=‘./bin/flink run
>> flink/build-target/examples/batch/WordCount.jar',
>> ...
>> )
>> ```
>> Alse Airflow supports submitting jobs to kubernetes and you can even
>> implement your own operator if bash command doesn’t meet your demands.
>>
>> Indeed Flink AI (flink-ai-extended
>>  ?) needs an enhanced
>> version of AirFlow, but it is mainly for streaming scenario which means the
>> job won’t stop. In your case which are all batch jobs it doesn’t help much.
>> Hope this helps.
>>
>> Regard,
>> Xin
>>
>>
>> 2021年2月2日 下午4:30,Flavio Pompermaier  写道:
>>
>> Hi Xin,
>> let me state first that I never used AirFlow so I can probably miss some
>> background here.
>> I just want to externalize the job scheduling to some consolidated
>> framework and from what I see Apache AirFlow is probably what I need.
>> However I can't find any good blog post or documentation about how to
>> integrate these 2 technologies using REST API of both services.
>> I saw that Flink AI decided to use a customized/enhanced version of
>> AirFlow [1] but I didn't look into the code to understand how they use it.
>> In my use case I just want to schedule 2 Flink batch jobs using the REST
>> API of AirFlow, where the second one is fired after the first.
>>
>> [1]
>> https://github.com/alibaba/flink-ai-extended/tree/master/flink-ai-flow
>>
>> Best,
>> Flavio
>>
>> On Tue, Feb 2, 2021 at 2:43 AM 姜鑫  wrote:
>>
>>> Hi Flavio,
>>>
>>> Could you explain what your direct question is? In my opinion, it is
>>> possible to define two airflow operators to submit dependent flink job, as
>>> long as the first one can reach the end.
>>>
>>> Regards,
>>> Xin
>>>
>>> 2021年2月1日 下午6:43,Flavio Pompermaier  写道:
>>>
>>> Any advice here?
>>>
>>> On Wed, Jan 27, 2021 at 9:49 PM Flavio Pompermaier 
>>> wrote:
>>>
 Hello everybody,
 is there any suggested way/pointer to schedule Flink jobs using Apache
 AirFlow?
 What I'd like to achieve is the submission (using the REST API of
 AirFlow) of 2 jobs, where the second one can be executed only if the first
 one succeed.

 Thanks in advance
 Flavio

>>>
>>


Re: Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-02 Thread Xintong Song
Hi Randal,
The image is too blurred to be clearly seen.
I have a few questions.
- IIUC, you are using the standalone K8s deployment [1], not the native K8s
deployment [2]. Could you confirm that?
- How is the memory measured?

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html



On Tue, Feb 2, 2021 at 7:24 PM Randal Pitt  wrote:

> Hi,
>
> We're running Flink 1.11.3 on Kubernetes. We have a job with parallelism of
> 10 running on 10 task managers each with 1 task slot. The job has 4 time
> windows with 2 different keys, 2 windows have reducers and 2 are processed
> by window functions. State is stored in RocksDB.
>
> We've noticed when a pod is restarted (say if the node it was on is
> restarted) the job restarts and the memory usage of the remaining 9 pods
> increases by roughly 1GB over the next 1-2 hours then stays at that level.
> If another pod restarts the remaining 9 increase in memory usage again.
> Eventually one or more pods reach the 6GB limit and are OOMKilled, leading
> to the job restarting and memory usage increasing again.
>
> If left it can lead to the situation where an OOMKill directly leads to an
> OOMKill which directly leads to another. At this point it requires manual
> intervention to resolve.
>
> I think it's exceedingly likely the excessive memory usage is in RocksDB
> rather than Flink, my question is whether there's anything we can do about
> the increase in memory usage after a failure?
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2869/Screenshot_2021-02-02_at_11.png>
>
>
> Best regards,
>
> Randal.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


RE: Event trigger query

2021-02-02 Thread Colletta, Edward
You can use a tumbling processing time window with an offset of 13 hours + your 
time zone offset.

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#tumbling-windows
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.html


From: Abhinav Sharma 
Sent: Tuesday, February 2, 2021 5:27 AM
To: user@flink.apache.org
Subject: Event trigger query

This email is from an external source - exercise caution regarding links and 
attachments.

Newbie question: How can I set triggers to stream which execute according to 
system time? Eg: I want to sum the elements of streams at 1PM everyday.


Re: flink做checkpoint失败 Checkpoint Coordinator is suspending.

2021-02-02 Thread chen310
flink版本是1.11,checkpoint配置是:

pipeline.time-characteristic EventTime
execution.checkpointing.interval 60
execution.checkpointing.min-pause 12
execution.checkpointing.timeout 12
execution.checkpointing.externalized-checkpoint-retention
RETAIN_ON_CANCELLATION
state.backend rocksdb
state.backend.incremental true
state.checkpoints.dir hdfs:///tmp/flink/checkpoint

完整的jm log很大,1g多,上面贴的是关键的错误信息



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 Thread Xintong Song
你之前的理解是正确的。Yarn 的 AM 就是 Flink 的 JM。

你看到的文档描述是有问题的。我查了一下 git history,你所摘录的内容 2014 年撰写的,描述的应该是项目初期的 on yarn
部署方式,早已经过时了。这部分内容在最新的 1.12 版本文档中已经被移除了。

Thank you~

Xintong Song



On Tue, Feb 2, 2021 at 6:43 PM lp <973182...@qq.com> wrote:

> 或者说,我知道,对于MapReduce任务,ApplicationMaster的实现是MRAppMaster,那flink on yarn
> ,ApplicationMaster对应的实现是啥?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-02 Thread Randal Pitt
Hi,

We're running Flink 1.11.3 on Kubernetes. We have a job with parallelism of
10 running on 10 task managers each with 1 task slot. The job has 4 time
windows with 2 different keys, 2 windows have reducers and 2 are processed
by window functions. State is stored in RocksDB.

We've noticed when a pod is restarted (say if the node it was on is
restarted) the job restarts and the memory usage of the remaining 9 pods
increases by roughly 1GB over the next 1-2 hours then stays at that level.
If another pod restarts the remaining 9 increase in memory usage again.
Eventually one or more pods reach the 6GB limit and are OOMKilled, leading
to the job restarting and memory usage increasing again.

If left it can lead to the situation where an OOMKill directly leads to an
OOMKill which directly leads to another. At this point it requires manual
intervention to resolve.

I think it's exceedingly likely the excessive memory usage is in RocksDB
rather than Flink, my question is whether there's anything we can do about
the increase in memory usage after a failure?


 

Best regards,

Randal.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Running Beam Pipelines on a Flink Application Mode Cluster

2021-02-02 Thread Yang Wang
Hi Jan,

If you could run your Apache Beam application with Flink session mode, then
it could also
work in application mode. The key difference for application mode is that
the job submission happens
in the JobManager pod, not at the Flink client side. If you want to use the
standalone application
on k8s[1], then you need to clean up all the resources(e.g. JobManager Job,
TaskManager Deployment)
manually after every execution. Maybe you could also have a try on the
native Flink application on K8s[2],
all the resources will be cleaned automatically once the job reached the
terminal state.

Please note that you need to ensure your user jars already exist in the
JobManager pod before
starting the cluster entrypoint. Using the init container or baking the
user jars into the docker
image could both work.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/standalone/kubernetes.html#deploy-application-cluster
[2].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/native_kubernetes.html#application-mode


Best,
Yang

Jan Bensien  于2021年2月2日周二 上午5:04写道:

> Hello,
>
> I am currently trying to run my Apache Beam applications using Flink as my
> backend. Currently i use a session cluster running on Kubernetes. Is it
> possible to run Beam pipelines using the application mode? I would like to
> change to application mode, as I currently benchmark my applications with
> limited execution time and reset my cluster between every execution.
>
>
> With many thanks,
>
> Jan
>


Re: Proctime consistency

2021-02-02 Thread Timo Walther
As far as I know, we support ROW_NUMBER in SQL that could give you 
sequence number.


Regarding window semantics, the processing time only determines when to 
trigger the evaluation (also mentioned here [1]). A timer is registered 
for the next evaluation. The window content and next timer is part of 
every checkpoint and savepoint. If you restore from a 
checkpoint/savepoint, the stored next timestamp will be checked with the 
current wall clock and an evaluation might be triggered immediately. 
Thus, usually event-time is more useful than processing time. If you 
have a lot of processing time timers set, they might all fire 
immediately during a restore.


So the window will not start over from scratch. But inflight data that 
was about to reach the window operator will be reread from the source 
operator.


Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#triggers



On 01.02.21 20:06, Rex Fenley wrote:
We need to aggregate in precisely row order. Is there a safe way to do 
this? Maybe with some sort of row time sequence number?


As written in another email, we're currently doing the following set of 
operations

valcompactedUserDocsStream = userDocsStream
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.aggregate(newCompactionAggregate())

I guess my concern is if we restore from a checkpoint or savepoint I 
don't understand how the window get's checkpointed and how window 
alignment works between runs of a job. Will the window just start over 
from scratch, and re-process any rows that may have been inflight but 
not finished processing in the previous run's last window?


If so then I guess everything will arrive in row order like we want it 
to. But if a window get's checkpointed with its previous proctime, then 
it may be misaligned in the next run and drop rows that were in that window.


On Mon, Feb 1, 2021 at 6:37 AM Timo Walther > wrote:


Hi Rex,

processing-time gives you no alignment of operators across nodes. Each
operation works with its local machine clock that might be interrupted
by the OS, Java garbage collector, etc. It is always a best effort
timing.

Regards,
Timo


On 27.01.21 18:16, Rex Fenley wrote:
 > Hello,
 >
 > I'm looking at ways to deduplicate data and found [1], but does
proctime
 > get committed with operators? How does this work against clock
skew on
 > different machines?
 >
 > Thanks
 >
 > [1]
 >

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication



 >

>
 >
 > --
 >
 > Rex Fenley|Software Engineer - Mobile and Backend
 >
 >
 > Remind.com >|
BLOG > |
 > FOLLOW US > | LIKE US
 > >
 >



--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 









flink??hdfs ????????kerberos??????

2021-02-02 Thread ??????
Hi??
??flinkkafka??hdfs??

Dynamic statefun topologies

2021-02-02 Thread Frédérique Mittelstaedt
Hi!

Thanks for all the great work on both Flink and Statefun. I saw this recent 
email thread 
(https://lists.apache.org/thread.html/re984157869f5efd136cda9d679889e6ba2f132213ae7afff715783e2%40%3Cuser.flink.apache.org%3E)
 and we’re looking at a similar problem.

Right now, statefun requires you to specify a yaml config upon start-up that 
sets up all the bindings. If you want to change the config, you effectively 
need to restart your Flink instance.

We’re looking to use Flink statefun with dynamic topologies, i.e. based on a 
config change in another system, we want to create/update/delete bindings.

From the email thread, it looks like there’s going to be support for dynamic 
function dispatch by name patterns which is pretty cool, but it sounds like you 
still need to redeploy if you add a new ingress or egress. Is that correct?

Are there plans to support such a dynamic use case? Or is there already a way 
to achieve this that I’m not aware of?

For now, we’re considering to generate the yaml dynamically and whenever a 
change is necessary, restart Flink with the new config. We can create a 
savepoint before teardown and resume from it after restart, but that adds quite 
a bit of complexity and potential points of failure. Would this approach be a 
recommended method for achieving this type of dynamic config right now?

Alternatively, I also saw that you can deploy jars to the Flink cluster, but 
the code samples all seem to be for JVM functions. Is it possible to submit 
remote function jobs as jars to Flink? If so, how do you do that / do you have 
a link to an example?

Thanks a lot for your help & all the best,
Frédérique



Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 Thread lp
或者说,我知道,对于MapReduce任务,ApplicationMaster的实现是MRAppMaster,那flink on yarn
,ApplicationMaster对应的实现是啥?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Event trigger query

2021-02-02 Thread Abhinav Sharma
Newbie question: How can I set triggers to stream which execute according
to system time? Eg: I want to sum the elements of streams at 1PM everyday.


Re: flink-parcel使用求助

2021-02-02 Thread lp
flink-parcel是什么提交方式,能详细发下么。如果采用per-job mode 或者application mode ,各个job的flink
集群在yarn上是独立的,kill一个job并不会影响宁一个



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 Thread lp
flink on yarn中,yarn的applicationMaster和flink
JobManager的关系是啥,我对yarn不是很熟悉,之前的理解是
JobManager就是yarn中的applicationMaster的角色。但我在官网中看到如下摘录:...Once that has
finished, the ApplicationMaster (AM) is started.The JobManager and AM are
running in the same container. Once they successfully started, the AM knows
the address of the JobManager (its own host).  说明 AM和JM是两个进程,可是我将flink
job提交到yarn集群,只看到有jobManager进程(YarnJobClusterEntrypoint),并没有看到过ApplicationMaster进程,请帮忙解释他们之间的联系和区别?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink CheckPoint/Savepoint Behavior Question

2021-02-02 Thread Arvid Heise
Hi Jason,

you got it perfectly right. So everything that is not in an explicit state
(or checkpointed in CheckpointedFunction#snapshotState) is lost on
recovery. However, Flink applications always go through the complete
life-cycle.

Note that I'd look into CheckpointedFunction if the side-information that
you fetch from S3 is not changing and rather small.

Best,

Arvid

On Tue, Feb 2, 2021 at 5:42 AM Raghavendar T S 
wrote:

> Flink is aware of all the tasks running in the cluster. If any of the
> tasks fails, the failed task is restored using the checkpoint (only If the
> task uses Flink Operator State). This scenario will not use savepoints.
> Savepoints are same as checkpoints and the difference is that the
> savepoints are created manually or when we manually cancel/stop a job. We
> can then start the same job again by pointing to the savepoint. If we start
> a job without a savepoint, the job will start with an empty operator state.
>
> Correct me If I am wrong.
>
> Other references:
>
> https://stackoverflow.com/questions/62935269/apache-flink-how-checkpoint-savepoint-works-if-we-run-duplicate-jobs-multi-te
>
> https://stackoverflow.com/questions/64605940/apache-flink-fsstatebackend-how-state-is-recovered-in-case-of-ta+sk-manager-f
>
> https://stackoverflow.com/questions/55613112/is-it-possible-to-recover-after-losing-the-checkpoint-coordinator/55615858#55615858
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>
> Thank you
>
>
>
>
>
> 
>  Virus-free.
> www.avast.com
> 
> <#m_2793272209905006169_m_8015168246347643637_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>
> On Tue, Feb 2, 2021 at 4:07 AM Jason Liu  wrote:
>
>> We currently have some logic to load data from S3 into memory in our
>> Flink/Kinesis Analytics app. This happens before the RichFunction.open()
>> function.
>>
>> We have two questions here and I can't find too much information in the
>> apache.org website:
>>
>>1.
>>
>>(More of a clarification) When Flink does checkpointing/savepointing
>>only the state and the stream positions are saved right? The memory 
>> content
>>won't be saved and restored.
>>2.
>>
>>When Flink restores from checkpoint/savepoint, does it still go
>>through the application initialization phase? Basically will the code
>>before the RichFunction' *open()* be run? If not, would the
>>operators.open() functions run, when Flink restore from
>>checkpoint/savepoint?
>>
>> Thanks,
>> Jason
>>
>
>
> --
> Raghavendar T S
> www.teknosrc.com
>
>
> 
>  Virus-free.
> www.avast.com
> 
> <#m_2793272209905006169_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>


Re: Integration with Apache AirFlow

2021-02-02 Thread Arvid Heise
Hi Flavio,

If you know a bit of Python, it's also trivial to add a new Flink operator
where you can use REST API.

In general, I'd consider Airflow to be the best choice for your problem,
especially if it gets more complicated in the future (do something else if
the first job fails).

If you have specific questions, feel free to ask.

Best,

Arvid

On Tue, Feb 2, 2021 at 10:08 AM 姜鑫  wrote:

> Hi Flavio,
>
> I probably understand what you need. Apache AirFlow is a scheduling
> framework which you can define your own dependent operators, therefore you
> can define a BashOperator to submit flink job to you local flink cluster.
> For example:
> ```
> t1 = BashOperator(
> task_id=‘flink-wordcount',
> bash_command=‘./bin/flink run
> flink/build-target/examples/batch/WordCount.jar',
> ...
> )
> ```
> Alse Airflow supports submitting jobs to kubernetes and you can even
> implement your own operator if bash command doesn’t meet your demands.
>
> Indeed Flink AI (flink-ai-extended
>  ?) needs an enhanced
> version of AirFlow, but it is mainly for streaming scenario which means the
> job won’t stop. In your case which are all batch jobs it doesn’t help much.
> Hope this helps.
>
> Regard,
> Xin
>
>
> 2021年2月2日 下午4:30,Flavio Pompermaier  写道:
>
> Hi Xin,
> let me state first that I never used AirFlow so I can probably miss some
> background here.
> I just want to externalize the job scheduling to some consolidated
> framework and from what I see Apache AirFlow is probably what I need.
> However I can't find any good blog post or documentation about how to
> integrate these 2 technologies using REST API of both services.
> I saw that Flink AI decided to use a customized/enhanced version of
> AirFlow [1] but I didn't look into the code to understand how they use it.
> In my use case I just want to schedule 2 Flink batch jobs using the REST
> API of AirFlow, where the second one is fired after the first.
>
> [1] https://github.com/alibaba/flink-ai-extended/tree/master/flink-ai-flow
>
> Best,
> Flavio
>
> On Tue, Feb 2, 2021 at 2:43 AM 姜鑫  wrote:
>
>> Hi Flavio,
>>
>> Could you explain what your direct question is? In my opinion, it is
>> possible to define two airflow operators to submit dependent flink job, as
>> long as the first one can reach the end.
>>
>> Regards,
>> Xin
>>
>> 2021年2月1日 下午6:43,Flavio Pompermaier  写道:
>>
>> Any advice here?
>>
>> On Wed, Jan 27, 2021 at 9:49 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hello everybody,
>>> is there any suggested way/pointer to schedule Flink jobs using Apache
>>> AirFlow?
>>> What I'd like to achieve is the submission (using the REST API of
>>> AirFlow) of 2 jobs, where the second one can be executed only if the first
>>> one succeed.
>>>
>>> Thanks in advance
>>> Flavio
>>>
>>
>
>


Re: Integration with Apache AirFlow

2021-02-02 Thread 姜鑫
Hi Flavio,

I probably understand what you need. Apache AirFlow is a scheduling framework 
which you can define your own dependent operators, therefore you can define a 
BashOperator to submit flink job to you local flink cluster. For example:
```
t1 = BashOperator(
task_id=‘flink-wordcount',
bash_command=‘./bin/flink run 
flink/build-target/examples/batch/WordCount.jar',
...
)
```
Alse Airflow supports submitting jobs to kubernetes and you can even implement 
your own operator if bash command doesn’t meet your demands.

Indeed Flink AI (flink-ai-extended 
 ?) needs an enhanced version of 
AirFlow, but it is mainly for streaming scenario which means the job won’t 
stop. In your case which are all batch jobs it doesn’t help much. Hope this 
helps.

Regard,
Xin


> 2021年2月2日 下午4:30,Flavio Pompermaier  写道:
> 
> Hi Xin,
> let me state first that I never used AirFlow so I can probably miss some 
> background here.
> I just want to externalize the job scheduling to some consolidated framework 
> and from what I see Apache AirFlow is probably what I need.
> However I can't find any good blog post or documentation about how to 
> integrate these 2 technologies using REST API of both services.
> I saw that Flink AI decided to use a customized/enhanced version of AirFlow 
> [1] but I didn't look into the code to understand how they use it.
> In my use case I just want to schedule 2 Flink batch jobs using the REST API 
> of AirFlow, where the second one is fired after the first.
> 
> [1] https://github.com/alibaba/flink-ai-extended/tree/master/flink-ai-flow 
> 
> 
> Best,
> Flavio
> 
> On Tue, Feb 2, 2021 at 2:43 AM 姜鑫  > wrote:
> Hi Flavio,
> 
> Could you explain what your direct question is? In my opinion, it is possible 
> to define two airflow operators to submit dependent flink job, as long as the 
> first one can reach the end.
> 
> Regards,
> Xin
> 
>> 2021年2月1日 下午6:43,Flavio Pompermaier > > 写道:
>> 
>> Any advice here?
>> 
>> On Wed, Jan 27, 2021 at 9:49 PM Flavio Pompermaier > > wrote:
>> Hello everybody,
>> is there any suggested way/pointer to schedule Flink jobs using Apache 
>> AirFlow?
>> What I'd like to achieve is the submission (using the REST API of AirFlow) 
>> of 2 jobs, where the second one can be executed only if the first one 
>> succeed.
>> 
>> Thanks in advance
>> Flavio
> 



写hdfs异常 org.apache.hadoop.fs.FileAlreadyExistsException

2021-02-02 Thread jiangjiguang719
flink版本:1.10
在写HDFS时报如下异常,并且出现这个异常后,之后的所有checkpoint都会出现此异常,任务下线重启后恢复正常
请问这是咋回事呢?
level
:WARN
location
:org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:796)
log
:2021-02-0207:45:09,024ultron-flink2hdfs-pb2-log_midu_serverWARNorg.apache.flink.runtime.jobmaster.JobMaster-Errorwhileprocessingcheckpointacknowledgementmessage

message
:Errorwhileprocessingcheckpointacknowledgementmessage
thread
:jobmanager-future-thread-25
throwable
:org.apache.flink.runtime.checkpoint.CheckpointException:Couldnotfinalizethependingcheckpoint21803.Failurereason:Failuretofinalizecheckpoint.
atorg.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:863)
atorg.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:781)
atorg.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:794)
atjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
atjava.util.concurrent.FutureTask.run(FutureTask.java:266)
atjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
atjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.hadoop.fs.FileAlreadyExistsException: 
/user/flink/checkpoints/7872abd7426a82d972db818ac35a5d11/chk-21803/_metadata 
for client 172.16.1.165 alreadyexists at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:3021)
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2908)
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2792)
 at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:615)
 at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:117)
 at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:413)
 at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at 
org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2278) at 
org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2274) at 
java.security.AccessController.doPrivileged(Native Method) at 
javax.security.auth.Subject.doAs(Subject.java:422) at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2272) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
 at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
 at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1841)
 at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1698) at 
org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1633) at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
 at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:891) at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:788) at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
 at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
 at 
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
 at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
 at 

Re: [Flink SQL] CompilerFactory cannot be cast error when executing statement

2021-02-02 Thread Timo Walther

Hi Sebastian,

sorry for the late reply. Could you solve the problem in the meantime? 
It definitely looks like a dependency conflict.


Regards,
Timo


On 22.01.21 18:18, Sebastián Magrí wrote:

Thanks a lot Matthias!

In the meantime I'm trying out something with the scala quickstart.


On Fri, 22 Jan 2021 at 17:12, Matthias Pohl > wrote:


Ok, to be fair, I just did some research on the error message and
didn't realize that you're working with binaries only.

I tried to set it up on my machine to be able to reproduce your
error. Unfortunately, I wasn't able to establish the connection
between Flink and Postgres using your docker-compose.yml.
I'm going to cc Timo. Maybe, he has a guess what's causing this error.

Best,
Matthias

On Fri, Jan 22, 2021 at 4:35 PM Sebastián Magrí
mailto:sebasma...@gmail.com>> wrote:

Hi Matthias!

I went through that thread but as I'm just using the
`apache/flink` docker image for testing I honestly couldn't
figure out how I would do that since I don't have a pom file to
edit. If it's possible to do it through the configuration I'd be
glad if you could point me out in the right direction.

Pretty evident I don't have a lot of experience with mvn or
"modern" Java in general.

:-)

Thanks!

On Fri, 22 Jan 2021 at 15:19, Matthias Pohl
mailto:matth...@ververica.com>> wrote:

Hi Sebastián,
have you tried changing the dependency scope to provided
for flink-table-planner-blink as it is suggested in [1]?

Best,
Matthias

[1]

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-10-exception-Unable-to-instantiate-java-compiler-td38221.html



On Fri, Jan 22, 2021 at 4:04 PM Sebastián Magrí
mailto:sebasma...@gmail.com>> wrote:

Hi!

I'm trying out Flink SQL with the attached
docker-compose file.

It starts up and then I create a table with the
following statement:

CREATE TABLE mytable_simple (
   `customer_id` INT
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:postgresql://pgusr:pgpwd@postgres/pdgb',
   'table-name' = 'mytable'
);

However when I try to run this:

select * from mytable_simple;

I get the following error in the client:

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException:
org.codehaus.janino.CompilerFactory cannot be cast to
org.codehaus.commons.compiler.ICompilerFactory

At first I thought it could be an incompatibility issue
with the libraries I was putting in, like the
postgres-cdc library version, but even after leaving
only the JDBC libraries in I still get the same error.

It'd be great if you could give me some pointers here.

Thanks!

-- 
Sebastián Ramírez Magrí




-- 
Sebastián Ramírez Magrí



Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl
Anton Wehner



--
Sebastián Ramírez Magrí




Flink SQL关于'connector' = 'filesystem‘的问题求助!

2021-02-02 Thread yinghua...@163.com
今天在使用Flink 1.11.3版本使用Flink SQL将kafka中数据导入到HDFS上时提示如下的错误
Caused by: org.apache.flink.table.api.TableException: Could not load service 
provider for factories.  
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:346)
  
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221)
  
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
  
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
  
... 39 more  
Caused by: java.util.ServiceConfigurationError: 
org.apache.flink.table.factories.Factory: Provider 
org.apache.flink.core.fs.HadoopFsFactory not found  
at java.util.ServiceLoader.fail(ServiceLoader.java:239)  
at java.util.ServiceLoader.access$300(ServiceLoader.java:185)  
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:372)  
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)  
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)  
at java.util.Iterator.forEachRemaining(Iterator.java:116)  

SQL语句为:
CREATE TABLE hdfs_table (
   content STRING,
   dt STRING,
   h STRING
) PARTITIONED BY (dt, h) WITH (
  'connector' = 'filesystem',
  'path '= 'hdfs://hdfsCluster/tmp/zyh_test',
  'format' = 'csv'
);

出错后我们在代码中没看到有HDFS实现DynamicTableSinkFactory的相关类,是不是FlinkSQL不支持写入到HDFS中?通过Hive的connector来实现?

在测试前我们按照官方文档如下的操作,添加HDFS的相关类
org.apache.flink.table.factories.Factory 
中为:org.apache.flink.core.fs.HadoopFsFactory
org.apache.flink.table.factories.TableFactory中为:org.apache.flink.table.filesystem.FileSystemTableFactory
   但是添加后报上述错误

添加新的可插拔文件系统实现
文件系统通过org.apache.flink.core.fs.FileSystem类表示,该类捕获访问和修改该文件系统中文件和对象的方式。
要添加新的文件系统:
添加文件系统实现,它是的子类org.apache.flink.core.fs.FileSystem。
添加一个实例化该文件系统并声明用于注册FileSystem的方案的工厂。这必须是的子类org.apache.flink.core.fs.FileSystemFactory。
添加服务条目。创建一个META-INF/services/org.apache.flink.core.fs.FileSystemFactory包含文件系统工厂类的类名的文件(有关更多详细信息,请参见Java
 Service Loader文档)。
在插件发现期间,文件系统工厂类将由专用的Java类加载器加载,以避免与其他插件和Flink组件发生类冲突。在文件系统实例化和文件系统操作调用期间,应使用相同的类加载器。









yinghua...@163.com


Re: Integration with Apache AirFlow

2021-02-02 Thread Flavio Pompermaier
Hi Xin,
let me state first that I never used AirFlow so I can probably miss some
background here.
I just want to externalize the job scheduling to some consolidated
framework and from what I see Apache AirFlow is probably what I need.
However I can't find any good blog post or documentation about how to
integrate these 2 technologies using REST API of both services.
I saw that Flink AI decided to use a customized/enhanced version of AirFlow
[1] but I didn't look into the code to understand how they use it.
In my use case I just want to schedule 2 Flink batch jobs using the REST
API of AirFlow, where the second one is fired after the first.

[1] https://github.com/alibaba/flink-ai-extended/tree/master/flink-ai-flow

Best,
Flavio

On Tue, Feb 2, 2021 at 2:43 AM 姜鑫  wrote:

> Hi Flavio,
>
> Could you explain what your direct question is? In my opinion, it is
> possible to define two airflow operators to submit dependent flink job, as
> long as the first one can reach the end.
>
> Regards,
> Xin
>
> 2021年2月1日 下午6:43,Flavio Pompermaier  写道:
>
> Any advice here?
>
> On Wed, Jan 27, 2021 at 9:49 PM Flavio Pompermaier 
> wrote:
>
>> Hello everybody,
>> is there any suggested way/pointer to schedule Flink jobs using Apache
>> AirFlow?
>> What I'd like to achieve is the submission (using the REST API of
>> AirFlow) of 2 jobs, where the second one can be executed only if the first
>> one succeed.
>>
>> Thanks in advance
>> Flavio
>>
>


Flink job与自己系统平台的一体化集成

2021-02-02 Thread Jacob
有一个模糊的需求,不知道是否合理

目前我们的实时计算的Job都是以On Yarn模式运行在hadoop集群,每次提交新的job,都是在Flink客户端下面,用./bin/flink
run-application -t yarn-application ... 的形式去提交Job。

现在我们有自研的一个关于数据处理平台,flink
job是数据处理的一个环节,想着能不能在我们系统的portal中配一个菜单,上传flink项目的jar包,可以提交Job到hadoop集群,形成一体化的管理,不用每次去一个flink客户端下面去提交了,不知道这种需求是否合理?

我想着如果在我们自己的平台上提交job,那是不是应该先把flink客户端先集成到我们的系统中呢,否则job如何被启动运行呢?

需求比较模糊,各位大佬见谅。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink做checkpoint失败 Checkpoint Coordinator is suspending.

2021-02-02 Thread Congxian Qiu
Hi
 你 flink 是什么版本,以及你作业 checkpoint/state 相关的配置是什么呢?如果可以的话,把完整的 jm log 发一下
Best,
Congxian


chen310 <1...@163.com> 于2021年2月1日周一 下午5:41写道:

> 补充下,jobmanager日志异常:
>
> 2021-02-01 08:54:43,639 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:44,642 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:45,644 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:46,647 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:47,649 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:48,652 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:49,655 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:50,658 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:50,921 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering
> checkpoint 8697 (type=CHECKPOINT) @ 1612169690917 for job
> 1299f2f27e56ec36a4e0ffd3472ad399.
> 2021-02-01 08:54:50,999 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Decline
> checkpoint 8697 by task 320d2c162f17265435777bb65e1a8934 of job
> 1299f2f27e56ec36a4e0ffd3472ad399 at
> container_e21_1596002540781_1159_01_000134 @
> ip-10-120-83-22.ap-northeast-1.compute.internal (dataPort=42984).
> 2021-02-01 08:54:51,661 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:52,654 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> GroupWindowAggregate(window=[SlidingGroupWindow('w$, requestDateTime,
> 180, 60)], properties=[w$start, w$end, w$rowtime, w$proctime],
> select=[COUNT(DISTINCT $f1) AS totalCount, start('w$) AS w$start, end('w$)
> AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) ->
> Calc(select=[(UNIX_TIMESTAMP((w$start DATE_FORMAT _UTF-16LE'-MM-dd
> HH:mm:ss')) * 1000) AS requestTime, totalCount]) (1/1)
> (6beee54a923323c369b046e199f572c4) switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@379a8f9c.
> java.io.IOException: Could not perform checkpoint 8697 for operator
> GroupWindowAggregate(window=[SlidingGroupWindow('w$, requestDateTime,
> 180, 60)], properties=[w$start, w$end, w$rowtime, w$proctime],
> select=[COUNT(DISTINCT $f1) AS totalCount, start('w$) AS w$start, end('w$)
> AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) ->
> Calc(select=[(UNIX_TIMESTAMP((w$start DATE_FORMAT _UTF-16LE'-MM-dd
> HH:mm:ss')) * 1000) AS requestTime, totalCount]) (1/1).
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:897)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:137)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io
> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
>
>