Re: Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-18 Thread Biao Liu
Hi Mans,

That's indeed a problem. We have a plan to fix it. I think it could be
included in 1.11. You could follow this issue [1] to check the progress.

[1] https://issues.apache.org/jira/browse/FLINK-9543

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Dec 2019 at 14:51, vino yang  wrote:

> Hi Mans,
>
> IMO, one job manager represents one Flink cluster and one Flink cluster
> has a suite of Flink configuration e.g. metrics reporter.
>
> Some metrics reporters support tag feature, you can specify it to
> distinguish different Flink cluster.[1]
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter
>
> Best,
> Vino
>
> M Singh  于2019年12月19日周四 上午2:54写道:
>
>> Hi:
>>
>> I am using AWS EMR with Flink application and two of the job managers are
>> running on the same host.  I am looking at the metrics documentation (Apache
>> Flink 1.9 Documentation: Metrics
>> )
>> and and see the following:
>>
>> Apache Flink 1.9 Documentation: Metrics
>>
>>
>> 
>>
>>- metrics.scope.jm
>>   - Default: .jobmanager
>>   - Applied to all metrics that were scoped to a job manager.
>>   -
>>
>> ...
>> List of all Variables
>> 
>>
>>- JobManager: 
>>- TaskManager: , 
>>- Job: , 
>>- Task: , , ,
>>, 
>>- Operator: ,, 
>>
>>
>>
>> My question is there a way to distinguish b/w the two job managers ? I
>> see only the  variable for JobManager and since the two are running
>> on the same host, the value is the same.  Is there any other variable that
>> I can use to distinguish the two.
>>
>> For taskmanager I have taskmanager id but am not sure about the job
>> manager.
>>
>> Thanks
>>
>> Mans
>>
>>


Re: Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-18 Thread vino yang
Hi Mans,

IMO, one job manager represents one Flink cluster and one Flink cluster has
a suite of Flink configuration e.g. metrics reporter.

Some metrics reporters support tag feature, you can specify it to
distinguish different Flink cluster.[1]

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter

Best,
Vino

M Singh  于2019年12月19日周四 上午2:54写道:

> Hi:
>
> I am using AWS EMR with Flink application and two of the job managers are
> running on the same host.  I am looking at the metrics documentation (Apache
> Flink 1.9 Documentation: Metrics
> )
> and and see the following:
>
> Apache Flink 1.9 Documentation: Metrics
>
>
> 
>
>- metrics.scope.jm
>   - Default: .jobmanager
>   - Applied to all metrics that were scoped to a job manager.
>   -
>
> ...
> List of all Variables
> 
>
>- JobManager: 
>- TaskManager: , 
>- Job: , 
>- Task: , , , ,
>
>- Operator: ,, 
>
>
>
> My question is there a way to distinguish b/w the two job managers ? I see
> only the  variable for JobManager and since the two are running on
> the same host, the value is the same.  Is there any other variable that I
> can use to distinguish the two.
>
> For taskmanager I have taskmanager id but am not sure about the job
> manager.
>
> Thanks
>
> Mans
>
>


Kafka table descriptor missing startFromTimestamp()

2019-12-18 Thread Steve Whelan
Examining the org.apache.flink.table.descriptors.Kafka class in Flink v1.8,
it has the following startUpModes for consumers:
.startFromEarliest()
.startFromLatest()
.startFromSpecificOffsets(...)

However, it does not have a method to support starting from a Timestamp.
The FlinkKafkaConsumer supports this feature though. Was it a conscience
decision to leave that start up mode out of the table descriptor? If so,
what was the reasoning?


回复: 如果用flink sql持续查询过去30分钟登录网站的人数?

2019-12-18 Thread Yuan,Youjun
不好意思,之前没看到这个问题。
Darwin-amd64就是mac上的可执行文件格式。信任他,直接可以执行的。

-邮件原件-
发件人: 陈帅  
发送时间: Saturday, December 7, 2019 10:48 PM
收件人: user-zh@flink.apache.org
主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数?

请问这个平台生成可执行文件creek是如何实现的?对应的Dawin-amd64环境下载下来的文件是什么格式的?

Yuan,Youjun  于2019年12月7日周六 下午8:32写道:

> 是的,与Flink完全一样的SQL接口,为边缘计算设计的流式计算框架。
>
>
> -邮件原件-
> 发件人: 陈帅 
> 发送时间: Saturday, December 7, 2019 11:36 AM
> 收件人: user-zh@flink.apache.org
> 主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数?
>
> 你们这个平台还挺方便快速验证的,是扩展了Flink SQL吗?
> 虽然没有完全解决我的问题,但还是要谢谢你。
>
> Yuan,Youjun  于2019年12月5日周四 上午10:41写道:
>
> > 可以用30分钟的range over窗口来处理,但是你提到两个0值得输出恐怕做不到,没有数据,没有产出。
> > 假设你得输入包含ts和userid两个字段,分别为时间戳和用户id,那么SQL应该这样:
> > INSERT INTO mysink
> > SELECT
> >ts, userid,
> >COUNT(userid)
> >OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN 
> > INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc
> >
> > 以如下输入为例:
> > "2019-12-05 12:02:00,user1",
> > "2019-12-05 12:13:00,user1",
> > "2019-12-05 12:15:00,user1",
> > "2019-12-05 12:31:00,user1",
> > "2019-12-05 12:40:00,user1",
> > "2019-12-05 12:45:00,user1"
> > 产出如下结果:
> > {"cnt":1,"ts":157554732,"userid":"user1"}
> > {"cnt":2,"ts":157554798,"userid":"user1"}
> > {"cnt":3,"ts":157554810,"userid":"user1"}
> > {"cnt":4,"ts":157554906,"userid":"user1"}
> > {"cnt":4,"ts":157554960,"userid":"user1"}
> > {"cnt":4,"ts":157554990,"userid":"user1"}
> >
> > 为了验证上述SQL,你可以将如下作业粘贴到http://creek.baidubce.com/
> > 的作业定义输入框中,点击生成可执行文件,运行下载到的可执行文件,就能看到结果:
> > {
> > "sources": [{
> > "schema": {
> > "format": "CSV",
> > "fields": [{
> > "name": "ts",
> > "type": "SQL_TIMESTAMP"
> > },
> > {
> > "name": "userid",
> > "type": "STRING"
> > }]
> > },
> > "watermark": 0,
> > "name": "mysrc",
> > "eventTime": "ts",
> > "type": "COLLECTION",
> > "attr": {
> > "input":[
> > "2019-12-05 12:02:00,user1",
> > "2019-12-05 12:13:00,user1",
> > "2019-12-05 12:15:00,user1",
> > "2019-12-05 12:31:00,user1",
> > "2019-12-05 12:40:00,user1",
> > "2019-12-05 12:45:00,user1"
> >   ]
> >   }
> > }],
> > "sink": {
> > "schema": {
> > "format": "JSON"
> > },
> > "name": "mysink",
> > "type": "STDOUT"
> > },
> > "name": "demojob",
> > "timeType": "EVENTTIME",
> > "sql": "INSERT INTO mysink SELECT rowtime, userid, COUNT(userid) 
> > OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30'
> > MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc"
> > }
> >
> >
> > 当然上面的例子是以事件时间,用处理时间也是可以的。为了验证,你可以把source.type从COLLECTION改成STDIN,把tim
> > eT
> > ype从EVENTTIME改成PROCESSTIME,重新生成、运行,从命令行下输入数据。
> >
> > 袁尤军
> >
> > -邮件原件-
> > 发件人: 陈帅 
> > 发送时间: Wednesday, December 4, 2019 11:40 PM
> > 收件人: user-zh@flink.apache.org
> > 主题: 如果用flink sql持续查询过去30分钟登录网站的人数?
> >
> > 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
> > 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
> > 12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4),
> > 12:41 (5), 12:46 (4), 13:16 (0)
> > 即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。
> >
> > 如果用sliding
> > window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用
> > over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
> > api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.
> >
> > 所以想问一下:
> > 1. 针对这种case有没有标准做法?sql支持吗?
> > 2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?
> >
> > 谢谢!
> > 陈帅
> >
>


Re: Operators resource requirements on K8s Flink session cluster

2019-12-18 Thread Yang Wang
Hi Michaël,

Glad to hear that you are going to run Flink workload on Kubernetes. AFAIK,
we have two
deployment ways.
1. Running Flink standalone session/per-job cluster on K8s. You need to
calculate how many
taskmanagers you need and the  per taskmanager. All the
taskmanager
will be started by a K8s deployment. You could find more information
here[1]. In this mode,
you could be `kubectl scale` to change the replicas of taskmanager if the
resources are not
enough for your job.
2. Natively running Flink session/per-job on K8s. The session mode has been
support in
master branch and will be released in 1.10. The per-job mode is in
discussion. No matter
session or per-job, the taskmanager will be allocated dynamically on
demand. You could
use a simple command to start a Flink cluster on K8s. More information
could be found
here[2].


Best,
Yang

[1].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
[2].
https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing


Michaël Melchiore  于2019年12月19日周四 上午1:11写道:

> Hello,
>
> I plan to run topologies on a Flink session cluster on Kubernetes.
> In my topologies, operators will have varying resource requirements in
> term of CPU and RAM.
> How can I make these informations available from Flink to Kubernetes so
> the latter takes it into account to optimize its deployment ?
>
> I am trying to achieve something similar to Apache Storm/Trident Resource
> Aware Scheduler
> .
>
> Kind regards,
>
> Michaël
>


Re: Rich Function Thread Safety

2019-12-18 Thread Zhu Zhu
Hi Aaron,

It is thread safe since the state snapshot happens in the same thread with
the user function.

Thanks,
Zhu Zhu

Aaron Langford  于2019年12月19日周四 上午11:25写道:

> Hello Flink Community,
>
> I'm hoping to verify some understanding:
>
> If I have a function with managed state, I'm wondering if a
> checkpoint will ever be taken while a function is mutating state. I'll try
> to illustrate the situation I'm hoping to be safe from:
>
> Happy Path:
> t0 -> processFunction invoked with el1
> t1 -> set A to 5
> t2 -> set B to 10
> t3 -> function returns
>
> Unhappy path:
> t0 -> processFunction invoked with el1
> t1 -> set A to 5
> t2 -> function interrupted, checkpoint taken (A = 5, B = 1)
> t3 -> set B to 10
> t4 -> function returns
> ...
> tn -> flink application fails, restart from prev checkpoint (A=5, B=1)
> tn+1 -> recovery begins somewhere, but state is torn anyway, so we're
> going to have a bad time
>
> I don't think this could happen given that checkpoints effectively are
> messages in the pipeline, and the checkpoint is only taken when an operator
> sees the checkpoint barrier.
>
> Hoping to make sure this is correct!
>
> Aaron
>


Rich Function Thread Safety

2019-12-18 Thread Aaron Langford
Hello Flink Community,

I'm hoping to verify some understanding:

If I have a function with managed state, I'm wondering if a checkpoint will
ever be taken while a function is mutating state. I'll try to illustrate
the situation I'm hoping to be safe from:

Happy Path:
t0 -> processFunction invoked with el1
t1 -> set A to 5
t2 -> set B to 10
t3 -> function returns

Unhappy path:
t0 -> processFunction invoked with el1
t1 -> set A to 5
t2 -> function interrupted, checkpoint taken (A = 5, B = 1)
t3 -> set B to 10
t4 -> function returns
...
tn -> flink application fails, restart from prev checkpoint (A=5, B=1)
tn+1 -> recovery begins somewhere, but state is torn anyway, so we're going
to have a bad time

I don't think this could happen given that checkpoints effectively are
messages in the pipeline, and the checkpoint is only taken when an operator
sees the checkpoint barrier.

Hoping to make sure this is correct!

Aaron


Re: [Question] How to use different filesystem between checkpointdata and user data sink

2019-12-18 Thread ouywl







Hi Piotr Nowojski,   I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1. The jobmanage don’t start up ,and It load the filesystem plugin in my owner plugin jar . and the log is :  “2019-12-19 10:58:32,394 WARN  org.apache.flink.configuration.Configuration  - Config uses deprecated configuration key 'high-availability.zookeeper.storageDir' instead of proper key 'high-availability.storageDir'2019-12-19 10:58:32,398 INFO  com.filesystem.plugin.FileSystemFactoryEnhance-  trying to get hadoopEnv, hadoopPath = /conf/hadoop_conf
2019-12-19 10:58:32,434 WARN  org.apache.hadoop.conf.Configuration  - /tmp/mammut-core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration  - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration  - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.failed.volumes.tolerated;  Ignoring.
2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration  - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.
2019-12-19 10:58:32,878 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)  at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)"






  










ouywl




ou...@139.com

On 12/19/2019 00:01,Piotr Nowojski wrote: 


Hi,As Yang Wang pointed out, you should use the new plugins mechanism.If it doesn’t work, first make sure that you are shipping/distributing the plugins jars correctly - the correct plugins directory structure both on the client machine. Next make sure that the cluster has the same correct setup. This is especially true for the standalone/cluster execution modes. For yarn, mesos, docker the plugins dir should be shipped to the cluster by Flink itself, however Plugins support in yarn is currently semi broken [1]. This is already fixed, but waiting to be released in 1.9.2 and 1.10.If it still doesn’t work, look for TaskManager logs what plugins/file systems are being loaded during the startup. If none, that's the problem.Piotrek[1] https://issues.apache.org/jira/browse/FLINK-14382On 18 Dec 2019, at 12:40, Yang Wang  wrote:You could have a try the new plugin mechanism.Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put your filesystem related jars in it.Different plugins will be loaded by separate classloader to avoid conflict.Best,Yangvino yang  于2019年12月18日周三 下午6:46写道:Hi ouywl,>>Thread.currentThread().getContextClassLoader();What does this statement mean in your program?In addition, can you share your implementation of the customized file system plugin and the related exception?Best,Vinoouywl  于2019年12月18日周三 下午4:59写道:








Hi 

Re: [DISCUSS] What parts of the Python API should we focus on next ?

2019-12-18 Thread jincheng sun
Also CC user-zh.

Best,
Jincheng


jincheng sun  于2019年12月19日周四 上午10:20写道:

> Hi folks,
>
> As release-1.10 is under feature-freeze(The stateless Python UDF is
> already supported), it is time for us to plan the features of PyFlink for
> the next release.
>
> To make sure the features supported in PyFlink are the mostly demanded for
> the community, we'd like to get more people involved, i.e., it would be
> better if all of the devs and users join in the discussion of which kind of
> features are more important and urgent.
>
> We have already listed some features from different aspects which you can
> find below, however it is not the ultimate plan. We appreciate any
> suggestions from the community, either on the functionalities or
> performance improvements, etc. Would be great to have the following
> information if you want to suggest to add some features:
>
> -
> - Feature description: 
> - Benefits of the feature: 
> - Use cases (optional): 
> --
>
> Features in my mind
>
> 1. Integration with most popular Python libraries
> - fromPandas/toPandas API
>Description:
>   Support to convert between Table and pandas.DataFrame.
>Benefits:
>   Users could switch between Flink and Pandas API, for example, do
> some analysis using Flink and then perform analysis using the Pandas API if
> the result data is small and could fit into the memory, and vice versa.
>
> - Support Scalar Pandas UDF
>Description:
>   Support scalar Pandas UDF in Python Table API & SQL. Both the
> input and output of the UDF is pandas.Series.
>Benefits:
>   1) Scalar Pandas UDF performs better than row-at-a-time UDF,
> ranging from 3x to over 100x (from pyspark)
>   2) Users could use Pandas/Numpy API in the Python UDF
> implementation if the input/output data type is pandas.Series
>
> - Support Pandas UDAF in batch GroupBy aggregation
>Description:
>Support Pandas UDAF in batch GroupBy aggregation of Python
> Table API & SQL. Both the input and output of the UDF is pandas.DataFrame.
>Benefits:
>   1) Pandas UDAF performs better than row-at-a-time UDAF more than
> 10x in certain scenarios
>   2) Users could use Pandas/Numpy API in the Python UDAF
> implementation if the input/output data type is pandas.DataFrame
>
> 2. Fully support  all kinds of Python UDF
> - Support Python UDAF(stateful) in GroupBy aggregation (NOTE: Please
> give us some use case if you want this feature to be contained in the next
> release)
>   Description:
> Support UDAF in GroupBy aggregation.
>   Benefits:
> Users could define and use Python UDAF and use it in GroupBy
> aggregation. Without it, users have to use Java/Scala UDAF.
>
> - Support Python UDTF
>   Description:
>Support  Python UDTF in Python Table API & SQL
>   Benefits:
> Users could define and use Python UDTF in Python Table API & SQL.
> Without it, users have to use Java/Scala UDTF.
>
> 3. Debugging and Monitoring of Python UDF
>- Support User-Defined Metrics
>  Description:
>Allow users to define user-defined metrics and global job
> parameters with Python UDFs.
>  Benefits:
>UDF needs metrics to monitor some business or technical indicators,
> which is also a requirement for UDFs.
>
>- Make the log level configurable
>  Description:
>Allow users to config the log level of Python UDF.
>  Benefits:
>Users could configure different log levels when debugging and
> deploying.
>
> 4. Enrich the Python execution environment
>- Docker Mode Support
>  Description:
>  Support running python UDF in docker workers.
>  Benefits:
>  Support various of deployments to meet more users' requirements.
>
> 5. Expand the usage scope of Python UDF
>- Support to use Python UDF via SQL client
>  Description:
>  Support to register and use Python UDF via SQL client
>  Benefits:
>  SQL client is a very important interface for SQL users. This
> feature allows SQL users to use Python UDFs via SQL client.
>
>- Integrate Python UDF with Notebooks
>  Description:
>  Such as Zeppelin, etc (Especially Python dependencies)
>
>- Support to register Python UDF into catalog
>   Description:
>   Support to register Python UDF into catalog
>   Benefits:
>   1)Catalog is the centralized place to manage metadata such as
> tables, UDFs, etc. With it, users could register the UDFs once and use it
> anywhere.
>   2) It's an important part of the SQL functionality. If Python
> UDFs are not supported to be registered and used in catalog, Python UDFs
> could not be shared between jobs.
>
> 6. Performance Improvements of Python UDF
>- Cython improvements
>   Description:
>   Cython Improvements in coder & operations
>   Benefits:
>   Initial 

Re: [DISCUSS] What parts of the Python API should we focus on next ?

2019-12-18 Thread jincheng sun
Also CC user-zh.

Best,
Jincheng


jincheng sun  于2019年12月19日周四 上午10:20写道:

> Hi folks,
>
> As release-1.10 is under feature-freeze(The stateless Python UDF is
> already supported), it is time for us to plan the features of PyFlink for
> the next release.
>
> To make sure the features supported in PyFlink are the mostly demanded for
> the community, we'd like to get more people involved, i.e., it would be
> better if all of the devs and users join in the discussion of which kind of
> features are more important and urgent.
>
> We have already listed some features from different aspects which you can
> find below, however it is not the ultimate plan. We appreciate any
> suggestions from the community, either on the functionalities or
> performance improvements, etc. Would be great to have the following
> information if you want to suggest to add some features:
>
> -
> - Feature description: 
> - Benefits of the feature: 
> - Use cases (optional): 
> --
>
> Features in my mind
>
> 1. Integration with most popular Python libraries
> - fromPandas/toPandas API
>Description:
>   Support to convert between Table and pandas.DataFrame.
>Benefits:
>   Users could switch between Flink and Pandas API, for example, do
> some analysis using Flink and then perform analysis using the Pandas API if
> the result data is small and could fit into the memory, and vice versa.
>
> - Support Scalar Pandas UDF
>Description:
>   Support scalar Pandas UDF in Python Table API & SQL. Both the
> input and output of the UDF is pandas.Series.
>Benefits:
>   1) Scalar Pandas UDF performs better than row-at-a-time UDF,
> ranging from 3x to over 100x (from pyspark)
>   2) Users could use Pandas/Numpy API in the Python UDF
> implementation if the input/output data type is pandas.Series
>
> - Support Pandas UDAF in batch GroupBy aggregation
>Description:
>Support Pandas UDAF in batch GroupBy aggregation of Python
> Table API & SQL. Both the input and output of the UDF is pandas.DataFrame.
>Benefits:
>   1) Pandas UDAF performs better than row-at-a-time UDAF more than
> 10x in certain scenarios
>   2) Users could use Pandas/Numpy API in the Python UDAF
> implementation if the input/output data type is pandas.DataFrame
>
> 2. Fully support  all kinds of Python UDF
> - Support Python UDAF(stateful) in GroupBy aggregation (NOTE: Please
> give us some use case if you want this feature to be contained in the next
> release)
>   Description:
> Support UDAF in GroupBy aggregation.
>   Benefits:
> Users could define and use Python UDAF and use it in GroupBy
> aggregation. Without it, users have to use Java/Scala UDAF.
>
> - Support Python UDTF
>   Description:
>Support  Python UDTF in Python Table API & SQL
>   Benefits:
> Users could define and use Python UDTF in Python Table API & SQL.
> Without it, users have to use Java/Scala UDTF.
>
> 3. Debugging and Monitoring of Python UDF
>- Support User-Defined Metrics
>  Description:
>Allow users to define user-defined metrics and global job
> parameters with Python UDFs.
>  Benefits:
>UDF needs metrics to monitor some business or technical indicators,
> which is also a requirement for UDFs.
>
>- Make the log level configurable
>  Description:
>Allow users to config the log level of Python UDF.
>  Benefits:
>Users could configure different log levels when debugging and
> deploying.
>
> 4. Enrich the Python execution environment
>- Docker Mode Support
>  Description:
>  Support running python UDF in docker workers.
>  Benefits:
>  Support various of deployments to meet more users' requirements.
>
> 5. Expand the usage scope of Python UDF
>- Support to use Python UDF via SQL client
>  Description:
>  Support to register and use Python UDF via SQL client
>  Benefits:
>  SQL client is a very important interface for SQL users. This
> feature allows SQL users to use Python UDFs via SQL client.
>
>- Integrate Python UDF with Notebooks
>  Description:
>  Such as Zeppelin, etc (Especially Python dependencies)
>
>- Support to register Python UDF into catalog
>   Description:
>   Support to register Python UDF into catalog
>   Benefits:
>   1)Catalog is the centralized place to manage metadata such as
> tables, UDFs, etc. With it, users could register the UDFs once and use it
> anywhere.
>   2) It's an important part of the SQL functionality. If Python
> UDFs are not supported to be registered and used in catalog, Python UDFs
> could not be shared between jobs.
>
> 6. Performance Improvements of Python UDF
>- Cython improvements
>   Description:
>   Cython Improvements in coder & operations
>   Benefits:
>   Initial 

Re: How to convert retract stream to dynamic table?

2019-12-18 Thread Kurt Young
Hi James,

If I understand correctly, you can use `TableEnvironment#sqlQuery` to
achieve
what you want. You can pass the whole sql statement in and get a `Table`
back
from the method. I believe this is the table you want which is semantically
equivalent with the stream you mentioned.

For example, you can further operate on the `Table` with other sql
operations,
like `GROUP BY cnt` on the returned table. You can think of it in this way
that
Flink would attach another aggregation operator to the original plan, and
this
operator can consume the retraction stream which the original sql statement
produced and start to generate correct results.

Best,
Kurt


On Thu, Dec 19, 2019 at 1:25 AM James Baker  wrote:

> Hi!
> I've been looking at Flink for the last few days and have very much
> appreciated the concept of Dynamic Tables, it solves a lot of my needs and
> handles a lot of the complex state tracking that is otherwise painful. I
> have a question about the composability of the system which the docs don't
> answer.
>
> The docs use the example of 'SELECT user, COUNT(url) as cnt FROM clicks
> GROUP BY user', where clicks is a stream coming in of user and the url
> they've clicked.
>
> From such a Table, I can then get a retract stream written into an
> external system, perhaps outputting (true, User1, 1), ..., (true, User1, 2)
> indicating that User1's clicked on something.
>
> Is there an idiomatic way to convert a retract stream into a semantically
> equivalent table?
>
> Thanks,
> James
>


Re: MiniCluster with ProcessingTimeTrigger

2019-12-18 Thread Biao Liu
Hi John,

The critical issue of your test case is that it's a finite streaming job.
The mini cluster or distributed cluster does not matter.

When the job is finishing, there are some windows not triggered yet. The
current behavior is dropping these windows. It's acceptable from the
perspective of window semantics. Because the condition (count/time based)
is not fulfilled, the window should not be triggered.

Normally we use window in infinite (or long running) streaming job. That
means there should be rare scenarios of finishing or stopping. Even if the
job is finished or stopped, the windows not triggered would also be
dropped. BUT, they could be replayed if the job recovers from a checkpoint
or savepoint.

Let's get back to your test case. If you want the correct result, you have
to fulfill the trigger condition of window. If the condition is processing
time based with 2 seconds. You have to guarantee the window could run
at-least 2 seconds. However it's not a good idea to design a time based
test case. I would suggest to design the case in the way: the source would
not exit until the window is triggered. The source could block on some
signal (CountDownLatch?) which would be triggered after window has been
triggered.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Dec 2019 at 06:06, John Morrow  wrote:

> Thanks Biao!
>
> I tried slowing down the input stream by replacing the
> env.fromCollection() with a custom SourceFunction (below) which drip feeds
> the data a bit slower. By the way, in my real scenario the datasource for
> the pipeline will be a RabbitMQ source.
>
>
> I do get better results, but it seems like a timing issue still exists:
>
>   +-- StreamTest [OK]
>   | +-- testPipelineWithProcessingTimeTrigger() 10480 ms [X] expected:
> <10> but was: <6>
>   | '-- testPipelineWithCountTrigger() [OK]
>
>
>
> I can probably play around with the window-size & sleep times below and
> get my tests to pass but I'm more concerned if there's a potential race
> condition/coordination step, outside of the MiniCluster test environment,
> that I should be dealing with.
>
> My pipeline is broken into two parts: pipeA & pipeB. I've done this
> because the 2nd part has an async operator so it needs to be at the start
> of a chain. For a non-MiniCluster environment, would it be possible for
> records to flow through pipeA and not reach pipeB, as I'm seeing with the
> MiniCluster? i.e. is there's something I need to do to explicitly
> connect/sync pipeA & pipeB before calling env.execute(), besides the fact
> that:
>
> pipeB = AsyncDataStream.unorderedWait(pipeA, ...
>
>
>
> Thanks!
> John.
>
>
>
>
> public class StreamTest {
>
>   private static class DripFeed extends RichSourceFunction {
>
> private volatile boolean isRunning = false;
> private final int inputSize;
>
> public DripFeed(int inputSize) {
>   this.inputSize = inputSize;
> }
>
> @Override
> public void open(Configuration parameters) {
>   isRunning = true;
> }
>
> @Override
> public void run(SourceContext ctx) throws Exception {
>   List listOfNumbers = IntStream.rangeClosed(1,
> inputSize).boxed().collect(Collectors.toList());
>   Iterator iterator = listOfNumbers.iterator();
>   while (isRunning && iterator.hasNext()) {
> try {
>   Thread.sleep(*100L*);
> } catch (InterruptedException e) {
>   System.out.println();
> }
> ctx.collect(iterator.next());
>   }
>   try {
> Thread.sleep(*1000L*);
>   } catch (InterruptedException e) {
> System.out.println();
>   }
> }
>
> @Override
> public void cancel() {
>   isRunning = false;
> }
>
>   }
>
>   @Test // :)
>   @Tag("unit")
>   public void testPipelineWithCountTrigger() throws Exception {
> runPipeline(*10*, CountTrigger.of(*10*));
>   }
>
>   @Test // :(
>   @Tag("unit")
>   public void testPipelineWithProcessingTimeTrigger() throws Exception {
> runPipeline(*10*, ProcessingTimeTrigger.create());
>   }
>
>
>   private void runPipeline(int inputSize, Trigger
> trigger) throws Exception {
>
> MiniClusterWithClientResource miniCluster = new
> MiniClusterWithClientResource(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberSlotsPerTaskManager(1)
> .setNumberTaskManagers(1)
> .setShutdownTimeout(org.apache.flink.api.common.time.Time.of(*1,
> TimeUnit.DAYS*))
> .build()
> );
> miniCluster.before();
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> CollectSink.values.clear();
>
> List listOfNumbers = IntStream.rangeClosed(1,
> inputSize).boxed().collect(Collectors.toList());
>
> // 1st half of pipeline
> //DataStream> pipeA = env.fromCollection(listOfNumbers)
> DataStream> pipeA = env.addSource(new
> StreamTest.DripFeed(inputSize))
> 

[DISCUSS] What parts of the Python API should we focus on next ?

2019-12-18 Thread jincheng sun
Hi folks,

As release-1.10 is under feature-freeze(The stateless Python UDF is already
supported), it is time for us to plan the features of PyFlink for the next
release.

To make sure the features supported in PyFlink are the mostly demanded for
the community, we'd like to get more people involved, i.e., it would be
better if all of the devs and users join in the discussion of which kind of
features are more important and urgent.

We have already listed some features from different aspects which you can
find below, however it is not the ultimate plan. We appreciate any
suggestions from the community, either on the functionalities or
performance improvements, etc. Would be great to have the following
information if you want to suggest to add some features:

-
- Feature description: 
- Benefits of the feature: 
- Use cases (optional): 
--

Features in my mind

1. Integration with most popular Python libraries
- fromPandas/toPandas API
   Description:
  Support to convert between Table and pandas.DataFrame.
   Benefits:
  Users could switch between Flink and Pandas API, for example, do
some analysis using Flink and then perform analysis using the Pandas API if
the result data is small and could fit into the memory, and vice versa.

- Support Scalar Pandas UDF
   Description:
  Support scalar Pandas UDF in Python Table API & SQL. Both the
input and output of the UDF is pandas.Series.
   Benefits:
  1) Scalar Pandas UDF performs better than row-at-a-time UDF,
ranging from 3x to over 100x (from pyspark)
  2) Users could use Pandas/Numpy API in the Python UDF
implementation if the input/output data type is pandas.Series

- Support Pandas UDAF in batch GroupBy aggregation
   Description:
   Support Pandas UDAF in batch GroupBy aggregation of Python Table
API & SQL. Both the input and output of the UDF is pandas.DataFrame.
   Benefits:
  1) Pandas UDAF performs better than row-at-a-time UDAF more than
10x in certain scenarios
  2) Users could use Pandas/Numpy API in the Python UDAF
implementation if the input/output data type is pandas.DataFrame

2. Fully support  all kinds of Python UDF
- Support Python UDAF(stateful) in GroupBy aggregation (NOTE: Please
give us some use case if you want this feature to be contained in the next
release)
  Description:
Support UDAF in GroupBy aggregation.
  Benefits:
Users could define and use Python UDAF and use it in GroupBy
aggregation. Without it, users have to use Java/Scala UDAF.

- Support Python UDTF
  Description:
   Support  Python UDTF in Python Table API & SQL
  Benefits:
Users could define and use Python UDTF in Python Table API & SQL.
Without it, users have to use Java/Scala UDTF.

3. Debugging and Monitoring of Python UDF
   - Support User-Defined Metrics
 Description:
   Allow users to define user-defined metrics and global job parameters
with Python UDFs.
 Benefits:
   UDF needs metrics to monitor some business or technical indicators,
which is also a requirement for UDFs.

   - Make the log level configurable
 Description:
   Allow users to config the log level of Python UDF.
 Benefits:
   Users could configure different log levels when debugging and
deploying.

4. Enrich the Python execution environment
   - Docker Mode Support
 Description:
 Support running python UDF in docker workers.
 Benefits:
 Support various of deployments to meet more users' requirements.

5. Expand the usage scope of Python UDF
   - Support to use Python UDF via SQL client
 Description:
 Support to register and use Python UDF via SQL client
 Benefits:
 SQL client is a very important interface for SQL users. This
feature allows SQL users to use Python UDFs via SQL client.

   - Integrate Python UDF with Notebooks
 Description:
 Such as Zeppelin, etc (Especially Python dependencies)

   - Support to register Python UDF into catalog
  Description:
  Support to register Python UDF into catalog
  Benefits:
  1)Catalog is the centralized place to manage metadata such as
tables, UDFs, etc. With it, users could register the UDFs once and use it
anywhere.
  2) It's an important part of the SQL functionality. If Python
UDFs are not supported to be registered and used in catalog, Python UDFs
could not be shared between jobs.

6. Performance Improvements of Python UDF
   - Cython improvements
  Description:
  Cython Improvements in coder & operations
  Benefits:
  Initial tests show that Cython will speed 3x+ in coder
serialization/deserialization.

7. Add Python ML API
   - Add Python ML Pipeline API
 Description:
 Align Python ML Pipeline API with Java/Scala
 Benefits:
   1) Currently, we already have the Pipeline APIs for ML. It would be
good to 

Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread vino yang
Hi Utopia,

IMO, your analysis is correct.

Best,
Vino

Utopia  于2019年12月19日周四 上午12:44写道:

> Hi Vino,
>
> Maybe it is due to the type of window. What I used is
> ProcessingTimeSessionWindows, while keyedState is scoped to *window and
> key*. Window changes so that the ValueState is different.
>
> Best  regards
> Utopia
> 在 2019年12月18日 +0800 22:30,Utopia ,写道:
>
> Hi Vino,
>
> Thanks for your reply !
>
> The key of my input data is same value. So I think there is only one
> partition.
>
> And Why sometimes I can get the value stored in the ValueState before
> update?
>
> before update value : 3
>>
>> after update value: 4
>>
>>
> What’s more, How can I stored the previous value so that I can get the
> value when next element come in and invoke the onElement method?
>
>
>
> Best  regards
> Utopia
> 在 2019年12月18日 +0800 21:57,vino yang ,写道:
>
> Hi Utopia,
>
> The behavior may be correct.
>
> First, the default value is null. It's the correct value.
> `ValueStateDescriptor` has multiple constructors, some of them can let you
> specify a default value. However, these constructors are deprecated. And
> the doc does not recommend them.[1] For the other constructors which can
> not specify default values, it would be null.
>
> Second, before the window, there is a `keyBy` operation. it will partition
> your data. For each partition, the default value state is null.
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html
>
> Utopia  于2019年12月18日周三 下午7:20写道:
>
>> Hi,
>>
>> I want to get the last value stored in ValueState when processing element
>> in Trigger.
>>
>> But as the log shows that sometimes I can get the value, sometimes not.
>>
>> Only one key in my data(SensorReading).
>>
>> ValueState:
>>
>> class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
>>
>>   private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
>> classOf[Long])
>>
>>   var value = 1
>>
>>   override def onElement( r: SensorReading, timestamp: Long, window: 
>> TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
>>
>> println("before update value : " + 
>> ctx.getPartitionedState(descriptor).value())
>>
>> ctx.getPartitionedState(descriptor).update(value)
>>
>> value += 1
>>
>> println("after update value: " + 
>> ctx.getPartitionedState(descriptor).value())
>>
>> ctx.registerProcessingTimeTimer(window.maxTimestamp)
>> TriggerResult.CONTINUE
>>   }
>>
>>   override def onEventTime(time: Long, window: TimeWindow, ctx: 
>> Trigger.TriggerContext) = TriggerResult.CONTINUE
>>
>>   override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
>> Trigger.TriggerContext) = TriggerResult.FIRE
>>
>>   override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit 
>> = {
>> ctx.deleteProcessingTimeTimer(window.maxTimestamp)
>>   }
>>
>>   override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): 
>> Unit = {
>> val windowMaxTimestamp = window.maxTimestamp
>> if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
>> ctx.registerProcessingTimeTimer(windowMaxTimestamp)
>>   }
>>
>>   override def canMerge: Boolean = true
>>
>> }
>>
>>
>> Main process:
>>
>> object MyCustomWindows {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> env.getConfig.setAutoWatermarkInterval(1000L)
>>
>> val sensorData: DataStream[SensorReading] = env
>>   .addSource(new SensorSource)
>>   .assignTimestampsAndWatermarks(new SensorTimeAssigner)
>>
>> val countsPerThirtySecs = sensorData
>>   .keyBy(_.id)
>>   .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
>>   .trigger(new ProcessingTimeTrigger)
>>   .process(new CountFunction)
>>
>> env.execute()
>>   }
>> }
>>
>>
>> Log results:
>>
>> before update value : null
>> after update value: 1
>> before update value : null
>> after update value: 2
>> before update value : null
>> after update value: 3
>> before update value : 3
>> after update value: 4
>> before update value : null
>> after update value: 5
>> before update value : null
>> after update value: 6
>> before update value : null
>> after update value: 7
>> before update value : null
>> after update value: 8
>> before update value : null
>> after update value: 9
>> before update value : 9
>> after update value: 10
>>
>>
>>
>> Best  regards
>> Utopia
>>
>


Re: flink跨集群kerberos认证问题

2019-12-18 Thread 李现
是配置了:keytab和principal两个配置吗

Leslie Yuen 于2019年12月19日 周四08:35写道:

>
> 遇到过同样的问题,最后的解决办法就是直接在应用的集群中配置好flink-conf.yaml文件,代码中不需要额外加入认证代码就可以了。我测试通过的是消费写入带kerberos认证的Kafka
>
> > 在 2019年12月18日,23:05,李现  写道:
> >
> > 各位好,
> >flink集群跨集群无法通过kerberos认证。
> >集群1:flink集群,无kerberos认证
> >集群2:hadoop2.6.0集群,有kerberos认证
> >
> 集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。
> > 现在没有什么思路,希望有经验的朋友提点下,不胜感激。
>


Re: flink跨集群kerberos认证问题

2019-12-18 Thread 李现
这个方法我试过,如果是在应用集群的flink配置好conf文件,flinkjob初始化以及checkpoint存储需要和本地的hdfs交互,这样也是会遇到问题。就是client是security,server(本地hdfs)是simple,这样也是会抛异常。

Leslie Yuen 于2019年12月19日 周四08:35写道:

>
> 遇到过同样的问题,最后的解决办法就是直接在应用的集群中配置好flink-conf.yaml文件,代码中不需要额外加入认证代码就可以了。我测试通过的是消费写入带kerberos认证的Kafka
>
> > 在 2019年12月18日,23:05,李现  写道:
> >
> > 各位好,
> >flink集群跨集群无法通过kerberos认证。
> >集群1:flink集群,无kerberos认证
> >集群2:hadoop2.6.0集群,有kerberos认证
> >
> 集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。
> > 现在没有什么思路,希望有经验的朋友提点下,不胜感激。
>


Re: flink跨集群kerberos认证问题

2019-12-18 Thread Leslie Yuen
遇到过同样的问题,最后的解决办法就是直接在应用的集群中配置好flink-conf.yaml文件,代码中不需要额外加入认证代码就可以了。我测试通过的是消费写入带kerberos认证的Kafka

> 在 2019年12月18日,23:05,李现  写道:
> 
> 各位好,
>flink集群跨集群无法通过kerberos认证。
>集群1:flink集群,无kerberos认证
>集群2:hadoop2.6.0集群,有kerberos认证
>集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。
> 现在没有什么思路,希望有经验的朋友提点下,不胜感激。


Need guidance on a use case

2019-12-18 Thread Eva Eva
Hi Team,

I'm trying Flink for the first time and encountered an issue that I would
like to discuss and understand if there is a way to achieve my use case
with Flink.

*Use case:* I need to perform unbounded stream joins on multiple data
streams by listening to different Kafka topics. I have a scenario to join a
column in a table with multiple columns in another table by avoiding
duplicate joins. The main concern is that I'm not able to avoid duplicate
joins.

*Issue: *Given the nature of data, it is possible to have updates over
time, sent as new messages since Kafka is immutable. For a given key I
would like to perform join only on the latest message, whereas currently
Flink performs join against all messages with the key (this is what I'm
calling as duplicate joins issue).
Example: Say I have two Kafka streams "User" and "Task". And I want to join
"User" with multiple columns in "Task".
Join "UserID" in "User" with "PrimaryAssignee", "SecondaryAssignee" and
"Manager" in "Task".

Assuming I created and registered DataStreams.
Below is my query:

  SELECT * FROM Task t
   LEFT JOIN User ua ON t.PrimaryAssignee = ua.UserID
   LEFT JOIN User ub ON t.SecondaryAssignee = ub.UserID
   LEFT JOIN User uc ON t.Manager = uc.UserID

Say I have 5 different messages in Kafka with UserID=1000, I don't want to
perform 5 joins instead I want to perform join with the only latest message
with UserID=1000. Is there any way to achieve this without using Temporal
Table Functions?

*I cannot use Temporal Table Functions because of below reasons:*
1. I need to trigger JOIN operation for every new message in Kafka. Whereas
new messages in Temporal Table don't trigger JOIN operation.
2. I need to perform LEFT OUTER JOINS, whereas Temporal Table can only be
used for INNER JOINS
3. From what I understand, JOIN in Temporal Table can only be performed
using Primary key, so I won't be able to Join more than one key.


Could someone please help me with this? Please let me know if any of the
information is not clear or need more details.

 If this is not the correct email id, could you please point me to the
correct one.


Thanks in advance!


Re: MiniCluster with ProcessingTimeTrigger

2019-12-18 Thread John Morrow
Thanks Biao!

I tried slowing down the input stream by replacing the env.fromCollection() 
with a custom SourceFunction (below) which drip feeds the data a bit slower. By 
the way, in my real scenario the datasource for the pipeline will be a RabbitMQ 
source.


I do get better results, but it seems like a timing issue still exists:

  +-- StreamTest [OK]
  | +-- testPipelineWithProcessingTimeTrigger() 10480 ms [X] expected: <10> but 
was: <6>
  | '-- testPipelineWithCountTrigger() [OK]


I can probably play around with the window-size & sleep times below and get my 
tests to pass but I'm more concerned if there's a potential race 
condition/coordination step, outside of the MiniCluster test environment, that 
I should be dealing with.

My pipeline is broken into two parts: pipeA & pipeB. I've done this because the 
2nd part has an async operator so it needs to be at the start of a chain. For a 
non-MiniCluster environment, would it be possible for records to flow through 
pipeA and not reach pipeB, as I'm seeing with the MiniCluster? i.e. is there's 
something I need to do to explicitly connect/sync pipeA & pipeB before calling 
env.execute(), besides the fact that:

pipeB = AsyncDataStream.unorderedWait(pipeA, ...


Thanks!
John.




public class StreamTest {

  private static class DripFeed extends RichSourceFunction {

private volatile boolean isRunning = false;
private final int inputSize;

public DripFeed(int inputSize) {
  this.inputSize = inputSize;
}

@Override
public void open(Configuration parameters) {
  isRunning = true;
}

@Override
public void run(SourceContext ctx) throws Exception {
  List listOfNumbers = IntStream.rangeClosed(1, 
inputSize).boxed().collect(Collectors.toList());
  Iterator iterator = listOfNumbers.iterator();
  while (isRunning && iterator.hasNext()) {
try {
  Thread.sleep(100L);
} catch (InterruptedException e) {
  System.out.println();
}
ctx.collect(iterator.next());
  }
  try {
Thread.sleep(1000L);
  } catch (InterruptedException e) {
System.out.println();
  }
}

@Override
public void cancel() {
  isRunning = false;
}

  }

  @Test // :)
  @Tag("unit")
  public void testPipelineWithCountTrigger() throws Exception {
runPipeline(10, CountTrigger.of(10));
  }

  @Test // :(
  @Tag("unit")
  public void testPipelineWithProcessingTimeTrigger() throws Exception {
runPipeline(10, ProcessingTimeTrigger.create());
  }


  private void runPipeline(int inputSize, Trigger trigger) 
throws Exception {

MiniClusterWithClientResource miniCluster = new 
MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1, 
TimeUnit.DAYS))
.build()
);
miniCluster.before();

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
CollectSink.values.clear();

List listOfNumbers = IntStream.rangeClosed(1, 
inputSize).boxed().collect(Collectors.toList());

// 1st half of pipeline
//DataStream> pipeA = env.fromCollection(listOfNumbers)
DataStream> pipeA = env.addSource(new 
StreamTest.DripFeed(inputSize))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)))

...(same as before...)




From: Biao Liu 
Sent: Tuesday 17 December 2019 21:50
To: John Morrow 
Cc: user 
Subject: Re: MiniCluster with ProcessingTimeTrigger

Hi John,

The root cause is the collection source exits too fast. The window would also 
exit without being triggered.

You could verify that by waiting a second before releasing the window. For 
example, insert a map operator between source and window operator. Blocking a 
second or more in the "close" method of this map operator. You will see the 
window would work well.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 18 Dec 2019 at 06:24, John Morrow 
mailto:johnniemor...@hotmail.com>> wrote:
Hi All,

I'm trying to test a pipeline that consists of two Flink tasks with a 
MiniCluster. The 1st task has a WindowAll operator which groups items into 
batches every second, and the 2nd task does an async operation with each batch 
and flatMaps the result.

I've whittled it down to the bare bones below. There are two tests:

  *   testPipelineWithCountTrigger - this one works fine 
  *   testPipelineWithProcessingTimeTrigger - this one doesn't give any output 

It seems like a timing issue. If I step through the failing one slowly I can 
see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear methods 
do get called, and the asyncInvoke method also gets called, but when I run it 
the 2nd test fails as it produces no output. I've tried setting the MiniCluster 
timeout to 1 day, the same with my 

Re: MapState with List Type for values

2019-12-18 Thread Yun Tang
Hi Aaron

There exists a runtime key which acts in the keyBy action, and one map-key in 
your map state. Generally speaking, the runtime key is not the same as the 
map-key. If you could store your emoji as the map-key in your state, no list 
state is necessary. The basic idea is a bit like join implemented in 
flink-table API [1]. I think you could refer to the implementation of SQL join 
and how they use states to find more hints.

[1] 
https://github.com/apache/flink/blob/4fc55c747d001b75ba4652e867c98a5e2d62fd69/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java#L182

Best
Yun Tang


From: Aaron Langford 
Sent: Thursday, December 19, 2019 2:22
To: Yun Tang 
Cc: user@flink.apache.org 
Subject: Re: MapState with List Type for values

So the suggestion as I read it is to have some kind of shared queue for all 
waiting records. This allows for use of ListState, and cheap appends.

Then the map state counts how many of each record is queued.

When I finally get a record that allows me to remove elements from the queue, I 
can iterate through the ListState until I have flushed N elements downstream, 
where N is the number from the mapstate.

Does that read back like what you were suggesting?


The question then becomes how to remove those flushed elements from the 
ListState. ListState doesn't support removing items by index, or like a linked 
list.

In which case, maybe this calls for two ListState objects. Each time I need to 
flush elements, I can move elements that don't match from one ListState to the 
other.

I scan through all records in ListState each time I receive a record who has 
elements waiting...


I'll have to think through this and see if that 2 ListState approach is 
feasible.

Is there anyone working on something like a TreeState, memtable/SortedList 
state, or AppendingState that supports linked-list-like remove operations? Some 
of these things might be useful in helping along this implementation.

Aaron

On Wed, Dec 18, 2019 at 8:53 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Aaron

You cannot set up a map state whose value is a list state, but you can set its 
value as a list. However, I think that would also suffer in 
serialize/deserialize when appending the list as value.

What is the KEY in your map state? If you could use emoji as your KEY, and you 
could act like this:

After keyby, we would first process a ,  then we get map state as < , 1>. 
Then we also process another , and we would get  < , 2>. Lastly, we process a 
咽 and try to search the map to know we already queued two . In this time we 
could produce {, 咽} and set previous map state as < , 1> . If you could 
follow this logic, the previous serialize/deserialize of Seq could be 
greatly reduced.

Best
Yun Tang

From: Aaron Langford 
mailto:aaron.langfor...@gmail.com>>
Sent: Wednesday, December 18, 2019 6:47
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: MapState with List Type for values

Hello Flink Community,

I have a question about using MapState with lists as values. Here is a 
description of the use case:

I have an operator over a keyed stream where for each record that comes, it 
needs to look into some state to determine if a value has arrived or not. If 
the value has not arrived yet, the record needs to be en-queued for when the 
value will eventually arrive. When that value does arrive, the queued records 
need to be flushed, and associated list state cleared.

Here's an emoji representation of the stream:
-
咽濾濾濾 ...
-
Where  must wait for at least 1 咽 to be output (otherwise be queued) and 濾 
must wait for to be output (otherwise be queued). If you can't tell, this is 
basically/sort of a join without a window.

The output should be something abstractly like this:

-
{,咽}{,咽}{,咽}{濾,}{濾,}{濾,}...
-

Many records might be en-queued while waiting for a given value. Many records 
may be waiting for many different values, but any record that is en-queued will 
only be waiting for a record of one type to show up.

Records on this stream are keyed by some shared parent key, and I have reasons 
to avoid keying the stream on the "join key" as it were. Namely I'm getting a 
CDC stream with a lot of different tables, and I want to avoid a topology with 
N operators for N different tables if I can.

If I lean on MapState> to get this done for me, then my job 
suffers considerably in terms of performance. I believe one of the biggest 
bottlenecks is that for each time I need to interact with a Seq (like 
for appends), I must deserialize the 

Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-18 Thread M Singh
Hi:
I am using AWS EMR with Flink application and two of the job managers are 
running on the same host.  I am looking at the metrics documentation (Apache 
Flink 1.9 Documentation: Metrics) and and see the following: 

| 
| 
|  | 
Apache Flink 1.9 Documentation: Metrics


 |

 |

 |

   
   - metrics.scope.jm  
  - Default: .jobmanager
  - Applied to all metrics that were scoped to a job manager.
  - 

...
List of all Variables
   
   - JobManager: 
   - TaskManager: , 
   - Job: , 
   - Task: , , , , 

   - Operator: ,, 


My question is there a way to distinguish b/w the two job managers ? I see only 
the  variable for JobManager and since the two are running on the same 
host, the value is the same.  Is there any other variable that I can use to 
distinguish the two.

For taskmanager I have taskmanager id but am not sure about the job manager.
Thanks
Mans


Re: MapState with List Type for values

2019-12-18 Thread Aaron Langford
So the suggestion as I read it is to have some kind of shared queue for all
waiting records. This allows for use of ListState, and cheap appends.

Then the map state counts how many of each record is queued.

When I finally get a record that allows me to remove elements from the
queue, I can iterate through the ListState until I have flushed N elements
downstream, where N is the number from the mapstate.

Does that read back like what you were suggesting?


The question then becomes how to remove those flushed elements from the
ListState. ListState doesn't support removing items by index, or like a
linked list.

In which case, maybe this calls for two ListState objects. Each time I need
to flush elements, I can move elements that don't match from one ListState
to the other.

I scan through all records in ListState each time I receive a record who
has elements waiting...


I'll have to think through this and see if that 2 ListState approach is
feasible.

Is there anyone working on something like a TreeState, memtable/SortedList
state, or AppendingState that supports linked-list-like remove operations?
Some of these things might be useful in helping along this implementation.

Aaron

On Wed, Dec 18, 2019 at 8:53 AM Yun Tang  wrote:

> Hi Aaron
>
> You cannot set up a map state whose value is a list state, but you can set
> its value as a list. However, I think that would also suffer in
> serialize/deserialize when appending the list as value.
>
> What is the KEY in your map state? If you could use emoji as your KEY, and
> you could act like this:
>
> After keyby, we would first process a ,  then we get map state as < ,
> 1>. Then we also process another , and we would get  < , 2>. Lastly,
> we process a 咽 and try to search the map to know we already queued two
> . In this time we could produce {, 咽} and set previous map state as <
> , 1> . If you could follow this logic, the previous
> serialize/deserialize of Seq could be greatly reduced.
>
> Best
> Yun Tang
> --
> *From:* Aaron Langford 
> *Sent:* Wednesday, December 18, 2019 6:47
> *To:* user@flink.apache.org 
> *Subject:* MapState with List Type for values
>
> Hello Flink Community,
>
> I have a question about using MapState with lists as values. Here is a
> description of the use case:
>
> I have an operator over a keyed stream where for each record that comes,
> it needs to look into some state to determine if a value has arrived or
> not. If the value has not arrived yet, the record needs to be en-queued for
> when the value will eventually arrive. When that value does arrive, the
> queued records need to be flushed, and associated list state cleared.
>
> Here's an emoji representation of the stream:
> -
> 咽濾濾濾 ...
> -
> Where  must wait for at least 1 咽 to be output (otherwise be queued)
> and 濾 must wait for to be output (otherwise be queued). If you can't
> tell, this is basically/sort of a join without a window.
>
> The output should be something abstractly like this:
>
> -
> {,咽}{,咽}{,咽}{濾,}{濾,}{濾,}...
> -
>
> Many records might be en-queued while waiting for a given value. Many
> records may be waiting for many different values, but any record that is
> en-queued will only be waiting for a record of one type to show up.
>
> Records on this stream are keyed by some shared parent key, and I have
> reasons to avoid keying the stream on the "join key" as it were. Namely I'm
> getting a CDC stream with a lot of different tables, and I want to avoid a
> topology with N operators for N different tables if I can.
>
> If I lean on MapState> to get this done for me, then my
> job suffers considerably in terms of performance. I believe one of the
> biggest bottlenecks is that for each time I need to interact with a
> Seq (like for appends), I must deserialize the entire list.
>
> Is there a way to set up a MapState whose value is a ListState? Or is
> there any guidance for how I might serialize/deserialize a list type in the
> MapState in such a way that appends aren't so expensive? Open to other
> suggestions/approaches as well.
>
> Aaron
>


How to convert retract stream to dynamic table?

2019-12-18 Thread James Baker
Hi!
I've been looking at Flink for the last few days and have very much appreciated 
the concept of Dynamic Tables, it solves a lot of my needs and handles a lot of 
the complex state tracking that is otherwise painful. I have a question about 
the composability of the system which the docs don't answer.

The docs use the example of 'SELECT user, COUNT(url) as cnt FROM clicks GROUP 
BY user', where clicks is a stream coming in of user and the url they've 
clicked.

From such a Table, I can then get a retract stream written into an external 
system, perhaps outputting (true, User1, 1), ..., (true, User1, 2) indicating 
that User1's clicked on something.

Is there an idiomatic way to convert a retract stream into a semantically 
equivalent table?

Thanks,
James


Operators resource requirements on K8s Flink session cluster

2019-12-18 Thread Michaël Melchiore
Hello,

I plan to run topologies on a Flink session cluster on Kubernetes.
In my topologies, operators will have varying resource requirements in term
of CPU and RAM.
How can I make these informations available from Flink to Kubernetes so the
latter takes it into account to optimize its deployment ?

I am trying to achieve something similar to Apache Storm/Trident Resource
Aware Scheduler
.

Kind regards,

Michaël


Re: MapState with List Type for values

2019-12-18 Thread Yun Tang
Hi Aaron

You cannot set up a map state whose value is a list state, but you can set its 
value as a list. However, I think that would also suffer in 
serialize/deserialize when appending the list as value.

What is the KEY in your map state? If you could use emoji as your KEY, and you 
could act like this:

After keyby, we would first process a ,  then we get map state as < , 1>. 
Then we also process another , and we would get  < , 2>. Lastly, we process a 
咽 and try to search the map to know we already queued two . In this time we 
could produce {, 咽} and set previous map state as < , 1> . If you could 
follow this logic, the previous serialize/deserialize of Seq could be 
greatly reduced.

Best
Yun Tang

From: Aaron Langford 
Sent: Wednesday, December 18, 2019 6:47
To: user@flink.apache.org 
Subject: MapState with List Type for values

Hello Flink Community,

I have a question about using MapState with lists as values. Here is a 
description of the use case:

I have an operator over a keyed stream where for each record that comes, it 
needs to look into some state to determine if a value has arrived or not. If 
the value has not arrived yet, the record needs to be en-queued for when the 
value will eventually arrive. When that value does arrive, the queued records 
need to be flushed, and associated list state cleared.

Here's an emoji representation of the stream:
-
咽濾濾濾 ...
-
Where  must wait for at least 1 咽 to be output (otherwise be queued) and 濾 
must wait for to be output (otherwise be queued). If you can't tell, this is 
basically/sort of a join without a window.

The output should be something abstractly like this:

-
{,咽}{,咽}{,咽}{濾,}{濾,}{濾,}...
-

Many records might be en-queued while waiting for a given value. Many records 
may be waiting for many different values, but any record that is en-queued will 
only be waiting for a record of one type to show up.

Records on this stream are keyed by some shared parent key, and I have reasons 
to avoid keying the stream on the "join key" as it were. Namely I'm getting a 
CDC stream with a lot of different tables, and I want to avoid a topology with 
N operators for N different tables if I can.

If I lean on MapState> to get this done for me, then my job 
suffers considerably in terms of performance. I believe one of the biggest 
bottlenecks is that for each time I need to interact with a Seq (like 
for appends), I must deserialize the entire list.

Is there a way to set up a MapState whose value is a ListState? Or is there any 
guidance for how I might serialize/deserialize a list type in the MapState in 
such a way that appends aren't so expensive? Open to other 
suggestions/approaches as well.

Aaron


Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread Utopia
Hi Vino,

Maybe it is due to the type of window. What I used is 
ProcessingTimeSessionWindows, while keyedState is scoped to window and key. 
Window changes so that the ValueState is different.

Best  regards
Utopia
在 2019年12月18日 +0800 22:30,Utopia ,写道:
> Hi Vino,
>
> Thanks for your reply !
>
> The key of my input data is same value. So I think there is only one 
> partition.
>
> And Why sometimes I can get the value stored in the ValueState before update?
> > > > > before update value : 3
> > > > > after update value: 4
>
> What’s more, How can I stored the previous value so that I can get the value 
> when next element come in and invoke the onElement method?
>
>
>
> Best  regards
> Utopia
> 在 2019年12月18日 +0800 21:57,vino yang ,写道:
> > Hi Utopia,
> >
> > The behavior may be correct.
> >
> > First, the default value is null. It's the correct value. 
> > `ValueStateDescriptor` has multiple constructors, some of them can let you 
> > specify a default value. However, these constructors are deprecated. And 
> > the doc does not recommend them.[1] For the other constructors which can 
> > not specify default values, it would be null.
> >
> > Second, before the window, there is a `keyBy` operation. it will partition 
> > your data. For each partition, the default value state is null.
> >
> > Best,
> > Vino
> >
> > [1]: 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html
> >
> > > Utopia  于2019年12月18日周三 下午7:20写道:
> > > > Hi,
> > > >
> > > > I want to get the last value stored in ValueState when processing 
> > > > element in Trigger.
> > > >
> > > > But as the log shows that sometimes I can get the value, sometimes not.
> > > >
> > > > Only one key in my data(SensorReading).
> > > >
> > > > ValueState:
> > > > class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
> > > >
> > > > private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
> > > > classOf[Long])
> > > > var value = 1
> > > > override def onElement( r: SensorReading, timestamp: Long, window: 
> > > > TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
> > > >
> > > >   println("before update value : " + 
> > > > ctx.getPartitionedState(descriptor).value())
> > > >
> > > >ctx.getPartitionedState(descriptor).update(value)
> > > >
> > > >value += 1
> > > >
> > > >println("after update value: " + 
> > > > ctx.getPartitionedState(descriptor).value())
> > > >
> > > >ctx.registerProcessingTimeTimer(window.maxTimestamp)
> > > >TriggerResult.CONTINUE
> > > > }
> > > >
> > > > override def onEventTime(time: Long, window: TimeWindow, ctx: 
> > > > Trigger.TriggerContext) = TriggerResult.CONTINUE
> > > >
> > > > override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
> > > > Trigger.TriggerContext) = TriggerResult.FIRE
> > > >
> > > > override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): 
> > > > Unit = {
> > > >ctx.deleteProcessingTimeTimer(window.maxTimestamp)
> > > >  }
> > > >
> > > > override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): 
> > > > Unit = {
> > > >val windowMaxTimestamp = window.maxTimestamp
> > > >if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
> > > > ctx.registerProcessingTimeTimer(windowMaxTimestamp)
> > > >  }
> > > >
> > > > override def canMerge: Boolean = true
> > > >
> > > > }
> > > >
> > > > Main process:
> > > > object MyCustomWindows {
> > > >
> > > > def main(args: Array[String]): Unit = {
> > > >
> > > >val env = StreamExecutionEnvironment.getExecutionEnvironment
> > > >env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
> > > >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > > >env.getConfig.setAutoWatermarkInterval(1000L)
> > > >
> > > >val sensorData: DataStream[SensorReading] = env
> > > >  .addSource(new SensorSource)
> > > >  .assignTimestampsAndWatermarks(new SensorTimeAssigner)
> > > >
> > > >val countsPerThirtySecs = sensorData
> > > >  .keyBy(_.id)
> > > > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
> > > >  .trigger(new ProcessingTimeTrigger)
> > > >  .process(new CountFunction)
> > > >
> > > >env.execute()
> > > >  }
> > > > }
> > > >
> > > > Log results:
> > > >
> > > > > before update value : null
> > > > > after update value: 1
> > > > > before update value : null
> > > > > after update value: 2
> > > > > before update value : null
> > > > > after update value: 3
> > > > > before update value : 3
> > > > > after update value: 4
> > > > > before update value : null
> > > > > after update value: 5
> > > > > before update value : null
> > > > > after update value: 6
> > > > > before update value : null
> > > > > after update value: 7
> > > > > before update value : null
> > > > > after update value: 8
> > > > > before update value : null
> > > > > after update value: 9
> > > > > before update value : 9
> > > 

Re: Restore metrics on broadcast state after restart

2019-12-18 Thread Yun Tang
Hi Gaël

You can try initializeState [1] to initialize your metrics values from states 
when restoring from a checkpoint.

context.getOperatorStateStore().getBroadcastState()  could visit your restored 
broadcast state.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction

Best
Yun Tang


From: Gaël Renoux 
Sent: Tuesday, December 17, 2019 23:22
To: user 
Subject: Restore metrics on broadcast state after restart

Hi everyone

I have an KeyedBroadcastProcessFunction with a broadcast state (a bunch of 
rules), and I have set up a few gauge metrics on that state (things such as 
number of known rules and timestamp of the last rule received). However, I have 
on an issue when the server restarts from a checkpoint or a savepoint: metrics 
values are not restored.

That's nothing anomalous: the fields used in the metrics are transient, not 
part of the state (I have followed this doc: 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types).
 The fields will be reset to the proper value in the next call to 
processBroadcastElement(), but that's not enough for my use case: rules updates 
aren't that frequent (it could be minutes or even hours before the next one). 
We can't have the metrics offline for that long.

Is there any way to reset those fields without waiting for the next messages to 
arrive? The open() method doesn't have access to the broadcast state, so I 
can't do it there. I could do it in processElement() (normal element are much 
more frequent than rules), but it's far from ideal:
- it would be done again and again for every single element received, which is 
overkill;
- it could only update the metric on the current subtask, not the others, so 
one subtask could lag behind.

Am I missing something here ? Is there any way to trigger a reset of the value 
when the broadcast state is reconstructed ?

Thanks for any help,
Gaël Renoux



Re: [DISCUSS] Drop vendor specific repositories from pom.xml

2019-12-18 Thread Robert Metzger
I guess we are talking about this profile [1] in the pom.xml?

+1 to remove.

I'm not sure if we need to rush this for the 1.10 release. The profile is
not doing us any harm at the moment.

[1]https://github.com/apache/flink/blob/master/pom.xml#L1035

On Wed, Dec 18, 2019 at 4:51 PM Till Rohrmann  wrote:

> Hi everyone,
>
> following the discussion started by Seth [1] I would like to discuss
> dropping the vendor specific repositories from Flink's parent pom.xml. As
> building Flink against a vendor specific Hadoop version is no longer needed
> (as it simply needs to be added to the classpath) and documented, I believe
> that the vendor specific repositories and the mapr profile have become
> obsolete. Moreover, users can still use vendor specific Hadoop versions if
> they configure their local maven to point to the respective repository [2].
> Flink's sources would simply no longer be shipped with this option.
>
> Are there any concerns about dropping the vendor specific repositories from
> pom.xml? I would like to make this change for the upcoming Flink 1.10
> release if possible.
>
> [1]
>
> https://lists.apache.org/thread.html/83afcf6c0d5d7a0a7179cbdac9593ebe7478b0dc548781bf9915a006%40%3Cdev.flink.apache.org%3E
> [2] https://maven.apache.org/guides/mini/guide-multiple-repositories.html
>
> Cheers,
> Till
>


Re: [Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread Piotr Nowojski
Hi,

As Yang Wang pointed out, you should use the new plugins mechanism.

If it doesn’t work, first make sure that you are shipping/distributing the 
plugins jars correctly - the correct plugins directory structure both on the 
client machine. Next make sure that the cluster has the same correct setup. 
This is especially true for the standalone/cluster execution modes. For yarn, 
mesos, docker the plugins dir should be shipped to the cluster by Flink itself, 
however Plugins support in yarn is currently semi broken [1]. This is already 
fixed, but waiting to be released in 1.9.2 and 1.10.

If it still doesn’t work, look for TaskManager logs what plugins/file systems 
are being loaded during the startup. If none, that's the problem.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-14382 


> On 18 Dec 2019, at 12:40, Yang Wang  wrote:
> 
> You could have a try the new plugin mechanism.
> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put 
> your filesystem related jars in it.
> Different plugins will be loaded by separate classloader to avoid conflict.
> 
> 
> Best,
> Yang
> 
> vino yang mailto:yanghua1...@gmail.com>> 
> 于2019年12月18日周三 下午6:46写道:
> Hi ouywl,
> 
> >>Thread.currentThread().getContextClassLoader();
> What does this statement mean in your program?
> 
> In addition, can you share your implementation of the customized file system 
> plugin and the related exception?
> 
> Best,
> Vino
> 
> ouywl mailto:ou...@139.com>> 于2019年12月18日周三 下午4:59写道:
> Hi all,
> We have implemented a filesystem plugin for sink data to hdfs1, and the 
> yarn for flink running is used hdfs2. So when the job running, the jobmanager 
> use the conf of hdfs1 to create filesystem, the filesystem plugin  is 
> conflict with flink component. 
> We implemeted step:
>   1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>   2.  ‘FileSystemFactoryEnhance’ is implement from 
> “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance" 
>   3. Add a service entry. Create a file 
> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains 
> the class name of “ FileSystemFactoryEnhance.class”
> 
> And  the job mainclass is :
>“ public static void main(String[] args) throws Exception{
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(60*1000);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> 
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.getConfig().enableSysoutLogging();
> 
> 
> Properties props = new Properties();
> props.put("bootstrap.servers", SERVERS);
> props.put("group.id ", GROUPID);
> props.put("enable.auto.commit", "true");
> // props.put("auto.commit.interval.ms ", 
> "1000");
> props.put("session.timeout.ms ", "3");
> props.put("auto.offset.reset", "latest");
> props.put("key.deserializer", 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
> props.put("value.deserializer", StringDeserializer.class.getName());
> FlinkKafkaConsumer010 consumer011 = new 
> FlinkKafkaConsumer010("zyf_test_2", new SimpleStringSchema(), props);
> DataStream source = env.addSource(consumer011).setParallelism(1);
> 
> source.print();
> Thread.currentThread().getContextClassLoader();
> 
> StreamingFileSink sink = StreamingFileSink
> .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new 
> SimpleStringEncoder<>("UTF-8"))
> .build();
> 
> source.addSink(sink);
> 
> env.execute();
> }”
> 
> And start the job, the jobmanager filesystem is error, the log means the 
> jobmananger use “FileSystemFactoryEnhance” filesystem and confict.
> 
> As the url 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems
>  
> 
>  how to avoid use “Thread.currentThread().getContextClassLoader()"
> 
> 
>   
> ouywl
> ou...@139.com
>  
> 



[DISCUSS] Drop vendor specific repositories from pom.xml

2019-12-18 Thread Till Rohrmann
Hi everyone,

following the discussion started by Seth [1] I would like to discuss
dropping the vendor specific repositories from Flink's parent pom.xml. As
building Flink against a vendor specific Hadoop version is no longer needed
(as it simply needs to be added to the classpath) and documented, I believe
that the vendor specific repositories and the mapr profile have become
obsolete. Moreover, users can still use vendor specific Hadoop versions if
they configure their local maven to point to the respective repository [2].
Flink's sources would simply no longer be shipped with this option.

Are there any concerns about dropping the vendor specific repositories from
pom.xml? I would like to make this change for the upcoming Flink 1.10
release if possible.

[1]
https://lists.apache.org/thread.html/83afcf6c0d5d7a0a7179cbdac9593ebe7478b0dc548781bf9915a006%40%3Cdev.flink.apache.org%3E
[2] https://maven.apache.org/guides/mini/guide-multiple-repositories.html

Cheers,
Till


Re: Re: flink如何动态修改窗口大小和类型?

2019-12-18 Thread 王双利
有例子吗?复杂点的是有一个控制窗口大小的控制流connect实现



王双利
 
发件人: 陈帅
发送时间: 2019-12-18 22:51
收件人: user-zh@flink.apache.org
主题: Re: flink如何动态修改窗口大小和类型?
现实中确实有这类需求,简单点的是元素本身带有窗口大小设置,复杂点的是有一个控制窗口大小的控制流connect实现。
 
LakeShen  于2019年12月18日周三 下午2:12写道:
 
> 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。
>
> 陈帅  于2019年12月14日周六 下午6:44写道:
>
> > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口.
> >
>


flink跨集群kerberos认证问题

2019-12-18 Thread 李现
各位好,
flink集群跨集群无法通过kerberos认证。
集群1:flink集群,无kerberos认证
集群2:hadoop2.6.0集群,有kerberos认证
集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。
现在没有什么思路,希望有经验的朋友提点下,不胜感激。


Re: Questions about taskmanager.memory.off-heap and taskmanager.memory.preallocate

2019-12-18 Thread Ethan Li
Thank you Vino for the information.

Best,
Ethan 

> On Dec 17, 2019, at 8:29 PM, vino yang  wrote:
> 
> Hi Ethan,
> 
> Share two things:
> 
> I have found "taskmanager.memory.preallocate" config option has been removed 
> in the master codebase.
> After researching git history, I found the description of 
> "taskmanager.memory.preallocate" was written by @Chesnay Schepler 
>   (from 1.8 branch). So maybe he can give more 
> context or information. Correct me, if I am wrong.
> Best,
> Vino.
> 
> Ethan Li mailto:ethanopensou...@gmail.com>> 
> 于2019年12月18日周三 上午10:07写道:
> I didn’t realize we was not chatting in the mailing list :)
> 
> I think it’s wrong because it kind of says full GC is triggered by reaching 
> MaxDirecMemorySize. 
> 
> 
>> On Dec 16, 2019, at 11:03 PM, Xintong Song > > wrote:
>> 
>> Glad that helped. I'm also posting this conversation to the public mailing 
>> list, in case other people have similar questions.
>> 
>> And regarding the GC statement, I think the document is correct.
>> - Flink Memory Manager guarantees that the amount of allocated managed 
>> memory never exceed the configured capacity, thus managed memory allocation 
>> should not trigger OOM.
>> - When preallocation is enabled, managed memory segments are allocated and 
>> pooled by Flink Memory Manager, no matter there are tasks requesting them or 
>> not. The segments will not be deallocated until the cluster is shutdown.
>> - When preallocation is disabled, managed memory segments are allocated only 
>> when tasks requesting them, and destroyed immediately when tasks return them 
>> to the Memory Manager. However, what this statement trying to say is that, 
>> the memory is not deallocated directly when the memory segment is destroyed, 
>> but will have to wait until the GC to be truly released.
>> 
>> Thank you~
>> Xintong Song
>> 
>> 
>> On Tue, Dec 17, 2019 at 12:30 PM Ethan Li > > wrote:
>> Thank you very much Xintong! It’s much clear to me now. 
>> 
>> I am still on standalone cluster setup.  Before I was using 350GB on-heap 
>> memory on a 378GB box. I saw a lot of swap activities. Now I understand that 
>> it’s because RocksDB didn’t have enough memory to use, so OS forces JVM to 
>> swap. It can explain why the cluster was not stable and kept crashing.
>> 
>> Now that I put 150GB off-heap and 150GB on-heap, the cluster is more stable 
>> than before. I thought it was because GC was reduced because now we have 
>> less heap memory. Now I understand that it’s because I have 78GB memory 
>> available for rocksDB to use, 50GB more than before. And it explains why I 
>> don’t see swaps anymore. 
>> 
>> This makes sense to me now. I just have to set preallocation to false to use 
>> the other 150 GB off-heap memory for rocksDB and do some tuning on these 
>> memory configs. 
>> 
>> 
>> One thing I noticed is that in 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#taskmanager-memory-preallocate
>>  
>> 
>> 
>>  If this configuration is set to false cleaning up of the allocated off-heap 
>> memory happens only when the configured JVM parameter MaxDirectMemorySize is 
>> reached by triggering a full GC
>> 
>> I think this statement is not correct. GC is not trigged by reaching 
>> MaxDirectMemorySize. It will throw "java.lang.OutOfMemoryError: Direct 
>> buffer memory” if MaxDirectMemorySize is reached. 
>> 
>> Thank you again for your help!
>> 
>> Best,
>> Ethan
>> 
>> 
>>> On Dec 16, 2019, at 9:44 PM, Xintong Song >> > wrote:
>>> 
>>> Hi Ethan,
>>> 
>>> When you say "it's doing better than before", what is your setups before? 
>>> Is it on-heap managed memory? With preallocation enabled or disabled? Also, 
>>> what deployment (standalone, yarn, or local executor) do you run Flink on? 
>>> It's hard to tell why the performance becomes better without knowing the 
>>> information above.
>>> 
>>> Since you are using RocksDB, and configure managed memory to off-heap, you 
>>> should set pre-allocation to false. Steaming job with RocksDB state backend 
>>> does not use managed memory at all. Setting managed memory to off-heap only 
>>> makes Flink to launch JVM with smaller heap space, leaving more space 
>>> outside JVM. Setting pre-allocation to false makes Flink allocate those 
>>> managed memory on-demand, and since there's no demand the managed memory 
>>> will not be allocated. Therefore, the memory space left outside JVM can be 
>>> fully leveraged by RocksDB.
>>> 
>>> Regarding related source codes, I would recommend the following:
>>> - MemoryManager - For how managed memory is allocated / used. Related to 
>>> pre-allocation.
>>> - ContaineredTaskManagerParameters - For how the JVM memory parameters are 
>>> decided. Related to on-heap / off-heap managed 

Re: flink如何动态修改窗口大小和类型?

2019-12-18 Thread 陈帅
现实中确实有这类需求,简单点的是元素本身带有窗口大小设置,复杂点的是有一个控制窗口大小的控制流connect实现。

LakeShen  于2019年12月18日周三 下午2:12写道:

> 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。
>
> 陈帅  于2019年12月14日周六 下午6:44写道:
>
> > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口.
> >
>


Re: flink sql confluent schema avro topic注册成表

2019-12-18 Thread 陈帅
谢谢回复,有了schema registry url为何还需要填subject和avroSchemaStr呢?

朱广彬  于2019年12月18日周三 上午10:30写道:

> Hi 陈帅,
>
> 目前社区确实不支持confluent schema registry的avro格式,我们内部也是依赖schema registry来做avro
> schema的管理,所以,我们改动了flink-avro 的源码来支持。
>
> 主要涉及到这些地方:
>
> org.apache.flink.formats.avro.{AvroRowFormatFactory,AvroRowDeserializationSchema,AvroRowSerializationSchema}
> 和org.apache.flink.table.descriptors.{Avro,AvroValidator}
>
> 使用时在构建Avro时指定以下三个参数即可(见标红部分):
>
> tableEnv.connect(
> new Kafka()
> .version("universal")
> .topic(topic)
> .properties(props)
> ).withFormat(
> new Avro()
>   .useRegistry(true)
>   .registryUrl(KAFKA_SCHEMA_REGISTRY_URL_ADDRESS)
>   .registrySubject(subject)
>   .avroSchema(avroSchemaStr)
> )
>
>
> 陈帅  于2019年12月18日周三 上午8:26写道:
> >
> > flink sql是否能够支持将confluent schema registry注册的一个avro数据格式 的topic注册成一张table?
>


Re: flink如何动态修改窗口大小和类型?

2019-12-18 Thread Utopia
不是有 Dynamic gap 吗?

Best  regards
Utopia
在 2019年12月18日 +0800 22:34,jingjing bai ,写道:
> 目前一个任务中,仅支持一种窗口。
> 动态修改本身应该是一个伪需求
> 如果你仅仅是为了方便快速开发,建议你用sql方式去开发,比如自己扩展sql client。
>
>
> LakeShen  于2019年12月18日周三 下午2:12写道:
>
> > 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。
> >
> > 陈帅  于2019年12月14日周六 下午6:44写道:
> >
> > > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口.
> > >
> >


Re: flink如何动态修改窗口大小和类型?

2019-12-18 Thread jingjing bai
目前一个任务中,仅支持一种窗口。
动态修改本身应该是一个伪需求
如果你仅仅是为了方便快速开发,建议你用sql方式去开发,比如自己扩展sql client。


LakeShen  于2019年12月18日周三 下午2:12写道:

> 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。
>
> 陈帅  于2019年12月14日周六 下午6:44写道:
>
> > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口.
> >
>


Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread Utopia
Hi Vino,

Thanks for your reply !

The key of my input data is same value. So I think there is only one partition.

And Why sometimes I can get the value stored in the ValueState before update?
> > > > before update value : 3
> > > > after update value: 4

What’s more, How can I stored the previous value so that I can get the value 
when next element come in and invoke the onElement method?



Best  regards
Utopia
在 2019年12月18日 +0800 21:57,vino yang ,写道:
> Hi Utopia,
>
> The behavior may be correct.
>
> First, the default value is null. It's the correct value. 
> `ValueStateDescriptor` has multiple constructors, some of them can let you 
> specify a default value. However, these constructors are deprecated. And the 
> doc does not recommend them.[1] For the other constructors which can not 
> specify default values, it would be null.
>
> Second, before the window, there is a `keyBy` operation. it will partition 
> your data. For each partition, the default value state is null.
>
> Best,
> Vino
>
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html
>
> > Utopia  于2019年12月18日周三 下午7:20写道:
> > > Hi,
> > >
> > > I want to get the last value stored in ValueState when processing element 
> > > in Trigger.
> > >
> > > But as the log shows that sometimes I can get the value, sometimes not.
> > >
> > > Only one key in my data(SensorReading).
> > >
> > > ValueState:
> > > class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
> > >
> > > private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
> > > classOf[Long])
> > > var value = 1
> > > override def onElement( r: SensorReading, timestamp: Long, window: 
> > > TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
> > >
> > >   println("before update value : " + 
> > > ctx.getPartitionedState(descriptor).value())
> > >
> > >ctx.getPartitionedState(descriptor).update(value)
> > >
> > >value += 1
> > >
> > >println("after update value: " + 
> > > ctx.getPartitionedState(descriptor).value())
> > >
> > >ctx.registerProcessingTimeTimer(window.maxTimestamp)
> > >TriggerResult.CONTINUE
> > > }
> > >
> > > override def onEventTime(time: Long, window: TimeWindow, ctx: 
> > > Trigger.TriggerContext) = TriggerResult.CONTINUE
> > >
> > > override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
> > > Trigger.TriggerContext) = TriggerResult.FIRE
> > >
> > > override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit 
> > > = {
> > >ctx.deleteProcessingTimeTimer(window.maxTimestamp)
> > >  }
> > >
> > > override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): 
> > > Unit = {
> > >val windowMaxTimestamp = window.maxTimestamp
> > >if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
> > > ctx.registerProcessingTimeTimer(windowMaxTimestamp)
> > >  }
> > >
> > > override def canMerge: Boolean = true
> > >
> > > }
> > >
> > > Main process:
> > > object MyCustomWindows {
> > >
> > > def main(args: Array[String]): Unit = {
> > >
> > >val env = StreamExecutionEnvironment.getExecutionEnvironment
> > >env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
> > >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > >env.getConfig.setAutoWatermarkInterval(1000L)
> > >
> > >val sensorData: DataStream[SensorReading] = env
> > >  .addSource(new SensorSource)
> > >  .assignTimestampsAndWatermarks(new SensorTimeAssigner)
> > >
> > >val countsPerThirtySecs = sensorData
> > >  .keyBy(_.id)
> > > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
> > >  .trigger(new ProcessingTimeTrigger)
> > >  .process(new CountFunction)
> > >
> > >env.execute()
> > >  }
> > > }
> > >
> > > Log results:
> > >
> > > > before update value : null
> > > > after update value: 1
> > > > before update value : null
> > > > after update value: 2
> > > > before update value : null
> > > > after update value: 3
> > > > before update value : 3
> > > > after update value: 4
> > > > before update value : null
> > > > after update value: 5
> > > > before update value : null
> > > > after update value: 6
> > > > before update value : null
> > > > after update value: 7
> > > > before update value : null
> > > > after update value: 8
> > > > before update value : null
> > > > after update value: 9
> > > > before update value : 9
> > > > after update value: 10
> > >
> > >
> > > Best  regards
> > > Utopia


Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread vino yang
Hi Utopia,

The behavior may be correct.

First, the default value is null. It's the correct value.
`ValueStateDescriptor` has multiple constructors, some of them can let you
specify a default value. However, these constructors are deprecated. And
the doc does not recommend them.[1] For the other constructors which can
not specify default values, it would be null.

Second, before the window, there is a `keyBy` operation. it will partition
your data. For each partition, the default value state is null.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html

Utopia  于2019年12月18日周三 下午7:20写道:

> Hi,
>
> I want to get the last value stored in ValueState when processing element
> in Trigger.
>
> But as the log shows that sometimes I can get the value, sometimes not.
>
> Only one key in my data(SensorReading).
>
> ValueState:
>
> class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
>
>   private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
> classOf[Long])
>
>   var value = 1
>
>   override def onElement( r: SensorReading, timestamp: Long, window: 
> TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
>
> println("before update value : " + 
> ctx.getPartitionedState(descriptor).value())
>
> ctx.getPartitionedState(descriptor).update(value)
>
> value += 1
>
> println("after update value: " + 
> ctx.getPartitionedState(descriptor).value())
>
> ctx.registerProcessingTimeTimer(window.maxTimestamp)
> TriggerResult.CONTINUE
>   }
>
>   override def onEventTime(time: Long, window: TimeWindow, ctx: 
> Trigger.TriggerContext) = TriggerResult.CONTINUE
>
>   override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
> Trigger.TriggerContext) = TriggerResult.FIRE
>
>   override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = 
> {
> ctx.deleteProcessingTimeTimer(window.maxTimestamp)
>   }
>
>   override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit 
> = {
> val windowMaxTimestamp = window.maxTimestamp
> if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
> ctx.registerProcessingTimeTimer(windowMaxTimestamp)
>   }
>
>   override def canMerge: Boolean = true
>
> }
>
>
> Main process:
>
> object MyCustomWindows {
>
>   def main(args: Array[String]): Unit = {
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(1000L)
>
> val sensorData: DataStream[SensorReading] = env
>   .addSource(new SensorSource)
>   .assignTimestampsAndWatermarks(new SensorTimeAssigner)
>
> val countsPerThirtySecs = sensorData
>   .keyBy(_.id)
>   .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
>   .trigger(new ProcessingTimeTrigger)
>   .process(new CountFunction)
>
> env.execute()
>   }
> }
>
>
> Log results:
>
> before update value : null
> after update value: 1
> before update value : null
> after update value: 2
> before update value : null
> after update value: 3
> before update value : 3
> after update value: 4
> before update value : null
> after update value: 5
> before update value : null
> after update value: 6
> before update value : null
> after update value: 7
> before update value : null
> after update value: 8
> before update value : null
> after update value: 9
> before update value : 9
> after update value: 10
>
>
>
> Best  regards
> Utopia
>


Re: Deploying Operator on concrete Task Manager

2019-12-18 Thread KristoffSC
Hi,
thanks for the replay.

Just to clarify, I will have to have *a new Flink Cluster* (Job Manager and
Task Manager) that will run in the secure zone which will ran the
AsyncEnrich Job right?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Deploying Operator on concrete Task Manager

2019-12-18 Thread Zhu Zhu
Hi KristoffSC,

Flink does not support specifying the TM for tasks.
So I think you need to launch a separate job to do the "AsyncCall + map" in
the secured zone.

Thanks,
Zhu Zhu

KristoffSC  于2019年12月18日周三 下午8:04写道:

> Hi,
> I have a question regarding job/operator deployment on Task Managers.
>
> If I understand correctly, my job will be spitted into individual tasks,
> which will be "deployed and executed" on particular task slot/s of Task
> Manager (depending on parallelism level of course).
>
> Lets imagine I have a Job that has:
>
> 1. Kafka Source with map to RawEvent
> 2. Enrichment that has AsyncCall + map to EnrichedEvent
> 3. Key stream with another Map/Process functions maybe with some Windowing.
> 4. Sink
>
> I have a grid that has 1 Job Manager and 3 Task Managers nodes.
> For a security reason one Task manager should be in a special network zone
> (red zone).
>
> The point 2 of my Job (Enrichment that has AsyncCall + map to
> EnrichedEvent)
> should be executed on that particular node that is located in a Secured
> Zone.
>
> All other Operations should not be putted on this node.
> Is there a way to configure this?
>
>
> As an alternative I can see that we could have a separate job that will
> just
> have a source, enrich, sink and this job will be "deployed" on this Secured
> Task Manager. Or maybe we should have separate Flink cluster for this?
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Deploying Operator on concrete Task Manager

2019-12-18 Thread KristoffSC
Hi,
I have a question regarding job/operator deployment on Task Managers.

If I understand correctly, my job will be spitted into individual tasks,
which will be "deployed and executed" on particular task slot/s of Task
Manager (depending on parallelism level of course).

Lets imagine I have a Job that has:

1. Kafka Source with map to RawEvent
2. Enrichment that has AsyncCall + map to EnrichedEvent
3. Key stream with another Map/Process functions maybe with some Windowing.
4. Sink

I have a grid that has 1 Job Manager and 3 Task Managers nodes. 
For a security reason one Task manager should be in a special network zone
(red zone).

The point 2 of my Job (Enrichment that has AsyncCall + map to EnrichedEvent)
should be executed on that particular node that is located in a Secured
Zone.

All other Operations should not be putted on this node.
Is there a way to configure this?


As an alternative I can see that we could have a separate job that will just
have a source, enrich, sink and this job will be "deployed" on this Secured
Task Manager. Or maybe we should have separate Flink cluster for this?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread Yang Wang
You could have a try the new plugin mechanism.
Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then
put your filesystem related jars in it.
Different plugins will be loaded by separate classloader to avoid conflict.


Best,
Yang

vino yang  于2019年12月18日周三 下午6:46写道:

> Hi ouywl,
>
> *>>Thread.currentThread().getContextClassLoader();*
>
> What does this statement mean in your program?
>
> In addition, can you share your implementation of the customized file
> system plugin and the related exception?
>
> Best,
> Vino
>
> ouywl  于2019年12月18日周三 下午4:59写道:
>
>> Hi all,
>> We have implemented a filesystem plugin for sink data to hdfs1, and
>> the yarn for flink running is used hdfs2. So when the job running, the
>> jobmanager use the conf of hdfs1 to create filesystem, the filesystem
>> plugin  is conflict with flink component.
>> We implemeted step:
>>   1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>>   2.  ‘FileSystemFactoryEnhance’ is implement from 
>> “FileSystemFactory”,add
>> kerberos auth in ”FileSystemFactoryEnhance"
>>   3. Add a service entry. Create a file
>> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which
>> contains the class name of “ FileSystemFactoryEnhance.class”
>>
>> And  the job mainclass is :
>>“ *public static void main(String[] args) throws Exception{*
>>
>> *StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();*
>>
>>
>>
>>
>>
>>
>>
>>
>> *env.enableCheckpointing(60*1000);
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> 
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> env.getConfig().enableSysoutLogging();Properties props = new 
>> Properties();props.put("bootstrap.servers", SERVERS);
>> props.put("group.id ", GROUPID);
>> props.put("enable.auto.commit", "true");// 
>> props.put("auto.commit.interval.ms ", 
>> "1000");props.put("session.timeout.ms ", 
>> "3");props.put("auto.offset.reset", "latest");
>> props.put("key.deserializer", 
>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
>> props.put("value.deserializer", StringDeserializer.class.getName());
>> FlinkKafkaConsumer010 consumer011 = new 
>> FlinkKafkaConsumer010("zyf_test_2", new SimpleStringSchema(), 
>> props);DataStream source = 
>> env.addSource(consumer011).setParallelism(1);source.print();
>> Thread.currentThread().getContextClassLoader();StreamingFileSink sink = 
>> StreamingFileSink.forRowFormat(new 
>> Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8")) 
>>.build();source.addSink(sink);env.execute();}”And start 
>> the job, the jobmanager filesystem is error, the log means the jobmananger 
>> use “FileSystemFactoryEnhance” filesystem and confict.As the url 
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems
>>  
>> 
>>  how to avoid use “Thread.currentThread().getContextClassLoader()"*
>>
>>
>> ouywl
>> ou...@139.com
>>
>> 
>>
>>


Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread Utopia
Hi,

I want to get the last value stored in ValueState when processing element in 
Trigger.

But as the log shows that sometimes I can get the value, sometimes not.

Only one key in my data(SensorReading).

ValueState:
class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {

private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
classOf[Long])
var value = 1
override def onElement( r: SensorReading, timestamp: Long, window: TimeWindow, 
ctx: Trigger.TriggerContext): TriggerResult = {

  println("before update value : " + 
ctx.getPartitionedState(descriptor).value())

   ctx.getPartitionedState(descriptor).update(value)

   value += 1

   println("after update value: " + ctx.getPartitionedState(descriptor).value())

   ctx.registerProcessingTimeTimer(window.maxTimestamp)
   TriggerResult.CONTINUE
}

override def onEventTime(time: Long, window: TimeWindow, ctx: 
Trigger.TriggerContext) = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
Trigger.TriggerContext) = TriggerResult.FIRE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
   ctx.deleteProcessingTimeTimer(window.maxTimestamp)
 }

override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit = {
   val windowMaxTimestamp = window.maxTimestamp
   if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
ctx.registerProcessingTimeTimer(windowMaxTimestamp)
 }

override def canMerge: Boolean = true

}

Main process:
object MyCustomWindows {

def main(args: Array[String]): Unit = {

   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   env.getConfig.setAutoWatermarkInterval(1000L)

   val sensorData: DataStream[SensorReading] = env
 .addSource(new SensorSource)
 .assignTimestampsAndWatermarks(new SensorTimeAssigner)

   val countsPerThirtySecs = sensorData
 .keyBy(_.id)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
 .trigger(new ProcessingTimeTrigger)
 .process(new CountFunction)

   env.execute()
 }
}

Log results:

before update value : null
after update value: 1
before update value : null
after update value: 2
before update value : null
after update value: 3
before update value : 3
after update value: 4
before update value : null
after update value: 5
before update value : null
after update value: 6
before update value : null
after update value: 7
before update value : null
after update value: 8
before update value : null
after update value: 9
before update value : 9
after update value: 10


Best  regards
Utopia


Re: [Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread vino yang
Hi ouywl,

*>>Thread.currentThread().getContextClassLoader();*

What does this statement mean in your program?

In addition, can you share your implementation of the customized file
system plugin and the related exception?

Best,
Vino

ouywl  于2019年12月18日周三 下午4:59写道:

> Hi all,
> We have implemented a filesystem plugin for sink data to hdfs1, and
> the yarn for flink running is used hdfs2. So when the job running, the
> jobmanager use the conf of hdfs1 to create filesystem, the filesystem
> plugin  is conflict with flink component.
> We implemeted step:
>   1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>   2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add
> kerberos auth in ”FileSystemFactoryEnhance"
>   3. Add a service entry. Create a file
> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which
> contains the class name of “ FileSystemFactoryEnhance.class”
>
> And  the job mainclass is :
>“ *public static void main(String[] args) throws Exception{*
>
> *StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();*
>
>
>
>
>
>
>
>
> *env.enableCheckpointing(60*1000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> 
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.getConfig().enableSysoutLogging();Properties props = new 
> Properties();props.put("bootstrap.servers", SERVERS);
> props.put("group.id ", GROUPID);
> props.put("enable.auto.commit", "true");// 
> props.put("auto.commit.interval.ms ", 
> "1000");props.put("session.timeout.ms ", 
> "3");props.put("auto.offset.reset", "latest");
> props.put("key.deserializer", 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
> props.put("value.deserializer", StringDeserializer.class.getName());
> FlinkKafkaConsumer010 consumer011 = new 
> FlinkKafkaConsumer010("zyf_test_2", new SimpleStringSchema(), props); 
>DataStream source = env.addSource(consumer011).setParallelism(1);  
>   source.print();Thread.currentThread().getContextClassLoader();
> StreamingFileSink sink = StreamingFileSink.forRowFormat(new 
> Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8"))  
>   .build();source.addSink(sink);env.execute();}”And start the 
> job, the jobmanager filesystem is error, the log means the jobmananger use 
> “FileSystemFactoryEnhance” filesystem and confict.As the url 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems
>  
> 
>  how to avoid use “Thread.currentThread().getContextClassLoader()"*
>
>
> ouywl
> ou...@139.com
>
> 
>
>


Re: Re: FLINK 1.9 + YARN+ SessionWindows + 大数据量 + 运行一段时间后 OOM

2019-12-18 Thread Xintong Song
   - "TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外" 这个是针对 batch (dataset /
   blink sql)  作业的,我看你跑的应该是 streaming 作业,把 taskmanager.memory.off-heap 设成 true
   只是单纯为了减小 jvm heap size,留出空间给 rocksdb。
   - 有一个 flink-examples 的目录,里面有一些示例作业,不过主要是展示 api 用法的。部署、资源调优方面的示例暂时还没有。
   - 另外,我在上一封邮件里描述的解决方案,是针对 flink 1.9 及以前版本的。最新尚未发布的 flink 1.10
   中资源配置部分做了比较大的改动,如果有兴趣的话可以等到发布之后关注一下相关的文档。

Thank you~

Xintong Song



On Wed, Dec 18, 2019 at 4:49 PM USERNAME  wrote:

> @tonysong...@gmail.com 感谢回复
> 看了下参数的含义,
> taskmanager.memory.off-heap:
> 如果设置为true,TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外。对于具有较大内存的设置,这可以提高在内存上执行的操作的效率(默认为false)。
> JVM堆内使用的内存是受YARN限制的,JVM堆外不受YARN限制,如果这样确实能 说通现在我的问题,
> 已经修改并且在测试了,非常感谢tonysong...@gmail.com
> 咱们FLINK有没有一些最佳实践的项目样例,能体现一些细节上的东西,能让大家用的更简单一些,体现FLINK的强大。
>
>
>
> 在 2019-12-17 18:16:02,"Xintong Song"  写道:
> >你这个不是OOM,是 container 内存超用被 yarn 杀掉了。
> >JVM 的内存是不可能超用的,否则会报 OOM。所以比较可能是 RocksDB 的内存够用量增加导致了超用。
> >
> >建议:
> >
> >1.  增加如下配置
> >taskmanager.memory.off-heap: true
> >taskmanager.memory.preallocate: false
> >
> >2. 若果已经采用了如下配置,或者改了配置之后仍存在问题,可以尝试调大下面这个配置,未配置时默认值是0.25
> >containerized.heap-cutoff-ratio
> >
> >Thank you~
> >
> >Xintong Song
> >
> >
> >
> >On Tue, Dec 17, 2019 at 5:49 PM USERNAME  wrote:
> >
> >> 版本:flink 1.9.1
> >> --运行命令
> >> flink run -d -m yarn-cluster -yn 40 -ys 2 
> >>
> >>
> >> --部分代码
> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >> RocksDBStateBackend backend = new RocksDBStateBackend(CHECKPOINT_PATH,
> >> true);
> >>
> >>
> >> .keyBy("imei")  //10W+
> >> .window(EventTimeSessionWindows.withGap(Time.hours(1))) //设备超过1小时没有点就算离线
> >> .trigger(new Trigger())
> >> .aggregate(new AggregateFunction(), new ProcessWindowFunction())
> >>
> >>
> >> --数据
> >> 总共10W+设备,每个设备每30秒一条数据,一分钟数据量20W左右。
> >>
> >>
> >> --错误现象
> >> 运行一段时间(几天)之后,taskmanager就会挂掉。
> >>
> >>
> >> --求教
> >> 1. flink 内存不断增加?
> >> 数据量是挺大的,并且窗口保留期可能会很长,但是实际数据运算一次就可以不用了,也做了StateTtlConfig 不知道
> >> 哪里?什么?导致的内存一直占用,可能用法有问题,希望大神能够指点一下迷津。
> >> 2. flink / yarn 参数配置能优化吗?
> >> 有flink on yarn 的配置最佳实践吗?
> >>
> >>
> >> 问题困扰很久了 从1.7 - 1.8 - 1.9 ,希望有熟悉内部机制和有过类似问题的大神指点一下。
> >>
> >>
> >>
> >>
> >> --错误信息 --> nodemanager .log
> >>
> >>
> >> 2019-12-17 16:55:16,545 WARN
> >>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> >> Process tree for container: container_e16_1575354121024_0050_01_08
> has
> >> processes older than 1 iteration running over the configured limit.
> >> Limit=3221225472, current usage = 3222126592
> >> 2019-12-17 16:55:16,546 WARN
> >>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> >> Container
> >> [pid=184523,containerID=container_e16_1575354121024_0050_01_08] is
> >> running 901120B beyond the 'PHYSICAL' memory limit. Current usage: 3.0
> GB
> >> of 3 GB physical memory used; 4.9 GB of 30 GB virtual memory used.
> Killing
> >> container.
> >> Dump of the process-tree for container_e16_1575354121024_0050_01_08
> :
> >> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> >> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> >> |- 184701 184523 184523 184523 (java) 21977 4845 5166649344 786279
> >> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m
> >> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC
> >> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
> >> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
> >>
> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
> >> -Dlogback.configurationFile=file:./logback.xml
> >> -Dlog4j.configuration=file:./log4j.properties
> >> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .
> >> |- 184523 184521 184523 184523 (bash) 2 3 118067200 373 /bin/bash -c
> >> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m
> >> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC
> >> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
> >> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
> >>
> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
> >> -Dlogback.configurationFile=file:./logback.xml
> >> -Dlog4j.configuration=file:./log4j.properties
> >> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1>
> >>
> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.out
> >> 2>
> >>
> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.err
> >>
> >>
> >>
> >> 2019-12-17 16:55:16,546 INFO
> >>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> >> Removed ProcessTree with root 184523
> >> 2019-12-17 16:55:16,547 INFO
> >>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
> >> 

Re: How to reprocess certain events in Flink?

2019-12-18 Thread Zhu Zhu
Hi Pooja,

My main confusion is, if 2 events have the same transaction_id, how can we
tell if it is a wanted one for value updates, or it is an unwanted
duplicate?

MapState with a TTL can be used for deduplicating, if it is supposed that a
duplicated event would not happen too late after the original event was
processed.

Thanks,
Zhu Zhu

Rafi Aroch  于2019年12月18日周三 下午3:50写道:

> Hi Pooja,
>
> Here's an implementation from Jamie Grier for de-duplication using
> in-memory cache with some expiration time:
>
> https://github.com/jgrier/FilteringExample/blob/master/src/main/java/com/dataartisans/DedupeFilteringJob.java
>
> If for your use-case you can limit the time period where duplications may
> happen, you can use this approach.
>
> Thanks,
> Rafi
>
>
> On Wed, Dec 18, 2019 at 8:16 AM Pooja Agrawal 
> wrote:
>
>> Hey,
>>
>> I am sorry for the confusion. So, the value is not already present in the
>> event. We are reading it from a static table (kind of data enrichment in
>> flink job). Above event is an enriched event.
>> If we say that this is some transaction event, the user would have done
>> the transaction once and hence the transaction_id is unique. But, the table
>> from where we are reading the value may contain the wrong value (not
>> always, sometimes because of bug). In this case, we may want to reprocess
>> that transaction event with new value (here, the transaction_id will be
>> same as previous, but the value will change). I hope this clears the
>> scenario. Let me know if you have any other questions.
>>
>> To solve the idempotency problem, you suggested to maintain a set
>> recording transaction_id(s). Since, we are aggregating over all events seen
>> till now, the number of events and hence ids will be too large. I am
>> assuming we will need to have some external store here and do a lookup
>> every time we process an event. This may increase the latency. Can you
>> suggest the efficient way to solve this? and if we need to have an external
>> store, what will be the best candidate?
>>
>> Thanks
>> Pooja
>>
>>
>>
>> On Wed, Dec 18, 2019 at 8:19 AM Zhu Zhu  wrote:
>>
>>> Hi Pooja,
>>>
>>> I'm a bit confused since in 1) it says that "If two events have same
>>> transaction_id, we can say that they are duplicates", and in 2) it says
>>> that "Since this is just a value change, the transaction_id will be same".
>>> Looks to me they are conflicting. Usually in case 2) scenarios, the value
>>> updates event is considered as new event which does not share the unique id
>>> with prior events.
>>>
>>> If each event has a unique transaction_id, you can use it to
>>> de-duplicate the events via a set recording the transaction_id(s) which are
>>> already processed. And then 2) would not be a problem with the unique
>>> transaction_id assumption.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Pooja Agrawal  于2019年12月17日周二 下午9:17写道:
>>>

 Hi,

 I have a use case where we are reading events from kinesis stream.The
 event can look like this
 Event {
   event_id,
   transaction_id
   key1,
   key2,
   value,
   timestamp,
   some other fields...
 }

 We want to aggregate the values per key for all events we have seen
 till now (as simple as "select key1, key2, sum(value) from events group by
 key1, key2key."). For this I have created a simple flink job which uses
 flink-kinesis connector and applies keyby() and sum() on the incoming
 events. I am facing two challenges here:

 1) The incoming events can have duplicates. How to maintain exactly
 once processing here, as processing duplicate events can give me erroneous
 result? The field transaction_id will be unique for each events. If two
 events have same transaction_id, we can say that they are duplicates (By
 duplicates here, I don't just mean the retried ones. The same message can
 be present in kinesis with different sequence number. I am not sure if
 flink-kinesis connector can handle that, as it is using KCL underlying
 which I assume doesn't take care of it)

 2) There can be the the cases where the value has been updated for a
 key after processing the event and we may want to reprocess those events
 with new value. Since this is just a value change, the transaction_id will
 be same. The idempotency logic will not allow to reprocess the events. What
 are the ways to handle such scenarios in flink?

 Thanks
 Pooja


 --
 Warm Regards,
 Pooja Agrawal

>>>
>>
>> --
>> Warm Regards,
>> Pooja Agrawal
>>
>


[Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread ouywl







Hi all,    We have implemented a filesystem plugin for sink data to hdfs1, and the yarn for flink running is used hdfs2. So when the job running, the jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin  is conflict with flink component.     We implemeted step:      1.  ‘FileSystemEnhance’ is implement from “FileSystem”      2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance"       3. Add a service entry. Create a file META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains the class name of “ FileSystemFactoryEnhance.class”And  the job mainclass is :   “ public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(60*1000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getConfig().enableSysoutLogging();Properties props = new Properties();props.put("bootstrap.servers", SERVERS);props.put("group.id", GROUPID);props.put("enable.auto.commit", "true");// props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "3");props.put("auto.offset.reset", "latest");props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010("zyf_test_2", new SimpleStringSchema(), props);DataStream source = env.addSource(consumer011).setParallelism(1);source.print();Thread.currentThread().getContextClassLoader();StreamingFileSink sink = StreamingFileSink.forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8")).build();source.addSink(sink);env.execute();}”And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems how to avoid use “Thread.currentThread().getContextClassLoader()"






  










ouywl




ou...@139.com
 






Re: jobmanager异常日志

2019-12-18 Thread Dino Zhang
这个只能说明超时,具体原因要看taskmanager日志

On Mon, Dec 16, 2019 at 2:49 PM pengchenglin  wrote:

> 各位:
>
> 大家好,在standalone的jobmanager节点的日志中看到的,这个153的taskmanager挂掉了,不知道是不是下面的报错导致的,有知道下面的Error是啥意思不
>
> 2019-12-15 17:15:21.999 [flink-metrics-379] ERROR akka.remote.Remoting
> flink-metrics-akka.remote.default-remote-dispatcher-20 - Association to
> [akka.tcp://flink-metr...@xx.xx.xx.153:35929] with UID [1
> 617823256] irrecoverably failed. Quarantining address.
> java.util.concurrent.TimeoutException: Remote system has been silent for
> too long. (more than 48.0 hours)
> at
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at
> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>

-- 
Regards,
DinoZhang


Re:Re: FLINK 1.9 + YARN+ SessionWindows + 大数据量 + 运行一段时间后 OOM

2019-12-18 Thread USERNAME
@tonysong...@gmail.com 感谢回复
看了下参数的含义,
taskmanager.memory.off-heap: 
如果设置为true,TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外。对于具有较大内存的设置,这可以提高在内存上执行的操作的效率(默认为false)。
JVM堆内使用的内存是受YARN限制的,JVM堆外不受YARN限制,如果这样确实能 说通现在我的问题,
已经修改并且在测试了,非常感谢tonysong...@gmail.com
咱们FLINK有没有一些最佳实践的项目样例,能体现一些细节上的东西,能让大家用的更简单一些,体现FLINK的强大。



在 2019-12-17 18:16:02,"Xintong Song"  写道:
>你这个不是OOM,是 container 内存超用被 yarn 杀掉了。
>JVM 的内存是不可能超用的,否则会报 OOM。所以比较可能是 RocksDB 的内存够用量增加导致了超用。
>
>建议:
>
>1.  增加如下配置
>taskmanager.memory.off-heap: true
>taskmanager.memory.preallocate: false
>
>2. 若果已经采用了如下配置,或者改了配置之后仍存在问题,可以尝试调大下面这个配置,未配置时默认值是0.25
>containerized.heap-cutoff-ratio
>
>Thank you~
>
>Xintong Song
>
>
>
>On Tue, Dec 17, 2019 at 5:49 PM USERNAME  wrote:
>
>> 版本:flink 1.9.1
>> --运行命令
>> flink run -d -m yarn-cluster -yn 40 -ys 2 
>>
>>
>> --部分代码
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> RocksDBStateBackend backend = new RocksDBStateBackend(CHECKPOINT_PATH,
>> true);
>>
>>
>> .keyBy("imei")  //10W+
>> .window(EventTimeSessionWindows.withGap(Time.hours(1))) //设备超过1小时没有点就算离线
>> .trigger(new Trigger())
>> .aggregate(new AggregateFunction(), new ProcessWindowFunction())
>>
>>
>> --数据
>> 总共10W+设备,每个设备每30秒一条数据,一分钟数据量20W左右。
>>
>>
>> --错误现象
>> 运行一段时间(几天)之后,taskmanager就会挂掉。
>>
>>
>> --求教
>> 1. flink 内存不断增加?
>> 数据量是挺大的,并且窗口保留期可能会很长,但是实际数据运算一次就可以不用了,也做了StateTtlConfig 不知道
>> 哪里?什么?导致的内存一直占用,可能用法有问题,希望大神能够指点一下迷津。
>> 2. flink / yarn 参数配置能优化吗?
>> 有flink on yarn 的配置最佳实践吗?
>>
>>
>> 问题困扰很久了 从1.7 - 1.8 - 1.9 ,希望有熟悉内部机制和有过类似问题的大神指点一下。
>>
>>
>>
>>
>> --错误信息 --> nodemanager .log
>>
>>
>> 2019-12-17 16:55:16,545 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Process tree for container: container_e16_1575354121024_0050_01_08 has
>> processes older than 1 iteration running over the configured limit.
>> Limit=3221225472, current usage = 3222126592
>> 2019-12-17 16:55:16,546 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Container
>> [pid=184523,containerID=container_e16_1575354121024_0050_01_08] is
>> running 901120B beyond the 'PHYSICAL' memory limit. Current usage: 3.0 GB
>> of 3 GB physical memory used; 4.9 GB of 30 GB virtual memory used. Killing
>> container.
>> Dump of the process-tree for container_e16_1575354121024_0050_01_08 :
>> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>> |- 184701 184523 184523 184523 (java) 21977 4845 5166649344 786279
>> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m
>> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC
>> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
>> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
>> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
>> -Dlogback.configurationFile=file:./logback.xml
>> -Dlog4j.configuration=file:./log4j.properties
>> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .
>> |- 184523 184521 184523 184523 (bash) 2 3 118067200 373 /bin/bash -c
>> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m
>> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC
>> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
>> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
>> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
>> -Dlogback.configurationFile=file:./logback.xml
>> -Dlog4j.configuration=file:./log4j.properties
>> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1>
>> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.out
>> 2>
>> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.err
>>
>>
>>
>> 2019-12-17 16:55:16,546 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Removed ProcessTree with root 184523
>> 2019-12-17 16:55:16,547 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
>> Container container_e16_1575354121024_0050_01_08 transitioned from
>> RUNNING to KILLING
>> 2019-12-17 16:55:16,549 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
>> Cleaning up container container_e16_1575354121024_0050_01_08
>> 2019-12-17 16:55:16,579 WARN
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
>> code from container container_e16_1575354121024_0050_01_08 is : 143


Re: (补充图片链接) 关于直接设置Watermark和flatmap后再设置的疑问

2019-12-18 Thread Dino Zhang
kafka的exactly once是通过checkpoint机制保存消费位点来保证的,和event time没关系。在进入时间窗口前提取event
time和设定watermark即可。

On Wed, Dec 18, 2019 at 4:12 PM 猫猫 <16770...@qq.com> wrote:

> 图片不能粘贴,放到github上面了。
> https://github.com/maobuji/Blog/blob/master/image/changeEventTime.jpg
>
>
>
>
> --原始邮件--
> 发件人:"猫猫"<16770...@qq.com;
> 发送时间:2019年12月18日(星期三) 下午4:03
> 收件人:"user-zh"
> 主题:回复: 关于直接设置Watermark和flatmap后再设置的疑问
>
>
>
> 可能是我说的不太明确,我理解事件时间在整个流系统下只有一个。
> 但是读kafka时,我无法直接拿到业务数据中的event-time。因为数据可能会被拆分为多条。
> 我只能当做字符串取出,并设置事件时间为kafka的时间。
>
>
> 在进行一次消息转换后,才能获取到数据中的时间。所以我需要在flatmap以后才能正确的设置event-time
> 但我又需要kafka的精确一次特性。
>
> 所以问题转变为如下的问题,我能在流中重设eventTime吗?但对最初的流(kafka提交)不影响。
> 所以也就是之前提到的问题。
> env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...)
>
> env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...);
>
>
> 更进一步,我能向下图一样,在不同的流过程中,设置不同的eventTime吗?并且他们能够实现整体提交。
>
>
>
>
>
> -- 原始邮件 --
> 发件人:"LakeShen" 发送时间:2019年12月18日(星期三) 下午2:10
> 收件人:"user-zh"
> 主题:Re: 关于直接设置Watermark和flatmap后再设置的疑问
>
>
>
> flatmap 逻辑中,你是否对消息记录的时间处理了吗,watermark 的更新的逻辑是比前一次 watermark 的时间截要大同时非空。
>
> 猫猫 <16770...@qq.com 于2019年12月18日周三 上午9:27写道:
>
>  env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...)
> 
> 
> env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...);
> 
>  使用kafka进行消费,直接设置Watermark和经过flatMap()以后再设置,会产生什么样的区别和影响。
>  flatMap可能会将数据处理为1-N条。那么在这种情况下,还能够保证kafka的精确一次吗?



-- 
Regards,
DinoZhang


?????????????????????? ????????????Watermark??flatmap??????????????

2019-12-18 Thread ????
??github
https://github.com/maobuji/Blog/blob/master/image/changeEventTime.jpg




----
??:""<16770...@qq.com;
:2019??12??18??(??) 4:03
??:"user-zh"

?????? ????????????Watermark??flatmap??????????????

2019-12-18 Thread ????

??kafka??event-time??
??kafka


??flatmap??event-time
??kafka

eventTime??kafka??
??
env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...)
env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...);


??eventTime??





----
??:"LakeShen"