How to custom (or use) a window to specify everyday's beginning as watermark?

2019-11-25 Thread Rock
I need my job to aggregator every device's mertic as daily report.But I did not 
find a window cancoverexactly one day,or let everyday's beginning 
as watermark .Should I custom a window or any other way toachieve?

flink on yarn 的 kerberos 认证问题

2019-11-25 Thread venn
各位大佬:

请教一个flink 认证的问题: Flink on yarn 运行在不用认证的 Hadoop
集群上,怎样访问带kerberos 认证集群的 hbase ?

 

下面是一些我们使用的描述和发现的问题:

我们有两个hadoop 集群,一个使用 Kerberos 认证模式,一个是 simple
认证模式,Flink 1.9.0  部署在 simple 认证的集群上。

最近在使用flink 读取 Kerberos 认证的集群的 hbase 上遇到了问题。配置
flink-conf.yaml 中的配置参数:security.kerberos.login.keytab 、
security.kerberos.login.principal 。 

我们计划在 map 中同步的读取 hbase 的数据,从输入数据中获取
rowkey,使用get 方式获取hbase 数据,程序启动后,呈现 “卡” 在map 算子上,直
到hbase get 超时,无法读取任何数据。在 taskmanager.log 中有发现有这样的日
志: 


org.apache.flink.yarn.YarnTaskExecutorRunner   - OS current user: yarn

org.apache.flink.yarn.YarnTaskExecutorRunner   - current Hadoop/Kerberos
user: admin (注:登陆用户)

 

org.apache.flink.yarn.YarnTaskExecutorRunner   - YARN daemon is running as:
admin Yarn client user obtainer: admin

org.apache.flink.runtime.security.modules.HadoopModule  - Hadoop user set to
admin (auth:SIMPLE)

 

看过对应位置的代码,将 “Hadoop.security.authentication =
kerberos” 参数添加到 Hadoop 的 配置文件中(注: 使用 simple 认证的 hadoop集
群使用 amberi 部署 的 hdp 集群,不开启 Kerberos 认证 参数 “Hadoop.security.
authentication” 的值为 simple ),使程序认证通过,但是 flink job 一直处于
created 状态,taskmanager.log 中一直报 “server asks us to fall back to
SIMPLE auth. But the client is configured to only allow secure connections”

 

 

看到官网文档有这样的描述:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/security-ker
beros.html 


Hadoop Security Module

This module uses the Hadoop UserGroupInformation (UGI) class to establish a
process-wide login user context. The login user is then used for all
interactions with Hadoop, including HDFS, HBase, and YARN.

If Hadoop security is enabled (in core-site.xml), the login user will have
whatever Kerberos credential is configured. Otherwise, the login user
conveys only the user identity of the OS account that launched the cluster.

 

 

 



flink on yarn 的 kerberos 认证问题

2019-11-25 Thread venn
各位大佬:

请教一个flink 认证的问题: Flink on yarn 运行在不用认证的 Hadoop
集群上,怎样访问带kerberos 认证集群的 hbase ?

 

下面是一些我们使用的描述和发现的问题:

我们有两个hadoop 集群,一个使用 Kerberos 认证模式,一个是 simple
认证模式,Flink 1.9.0  部署在 simple 认证的集群上。

最近在使用flink 读取 Kerberos 认证的集群的 hbase 上遇到了问题。配置
flink-conf.yaml 中的配置参数:security.kerberos.login.keytab 、
security.kerberos.login.principal 。 

我们计划在 map 中同步的读取 hbase 的数据,从输入数据中获取
rowkey,使用get 方式获取hbase 数据,程序启动后,呈现 “卡” 在map 算子上,直
到hbase get 超时,无法读取任何数据。在 taskmanager.log 中有发现有这样的日
志: 


org.apache.flink.yarn.YarnTaskExecutorRunner   - OS current user: yarn

org.apache.flink.yarn.YarnTaskExecutorRunner   - current Hadoop/Kerberos
user: admin (注:登陆用户)

 

org.apache.flink.yarn.YarnTaskExecutorRunner   - YARN daemon is running as:
admin Yarn client user obtainer: admin

org.apache.flink.runtime.security.modules.HadoopModule  - Hadoop user set to
admin (auth:SIMPLE)

 

看过对应位置的代码,将 “Hadoop.security.authentication =
kerberos” 参数添加到 Hadoop 的 配置文件中(注: 使用 simple 认证的 hadoop集
群使用 amberi 部署 的 hdp 集群,不开启 Kerberos 认证 参数 “Hadoop.security.
authentication” 的值为 simple ),使程序认证通过,但是 flink job 一直处于
created 状态,taskmanager.log 中一直报 “server asks us to fall back to
SIMPLE auth. But the client is configured to only allow secure connections”

 

 

看到官网文档有这样的描述:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/security-ker
beros.html 


Hadoop Security Module

This module uses the Hadoop UserGroupInformation (UGI) class to establish a
process-wide login user context. The login user is then used for all
interactions with Hadoop, including HDFS, HBase, and YARN.

If Hadoop security is enabled (in core-site.xml), the login user will have
whatever Kerberos credential is configured. Otherwise, the login user
conveys only the user identity of the OS account that launched the cluster.

 

 

 



Re: Flink behavior as a slow consumer - out of Heap MEM

2019-11-25 Thread vino yang
Hi Hanan,

Sometimes, the behavior depends on your implementation.

Since it's not a built-in connector, it would be better to share your
customized source with the community
so that the community would be better to help you figure out where is the
problem.

WDYT?

Best,
Vino

Hanan Yehudai  于2019年11月26日周二 下午12:27写道:

> HI ,  I am trying to do some performance test to my flink deployment.
>
> I am implementing an extremely simplistic use case
>
> I built a ZMQ Source
>
>
>
> The topology is ZMQ Source - > Mapper- > DIscardingSInk ( a sink that does
> nothing )
>
>
>
> Data is pushed via ZMQ at a very high rate.
>
> When the incoming  rate from ZMQ is higher then the rate flink can keep up
> with,  I can see that the JVM Heap is filling up  ( using Flinks metrics ) .
> when the heap is fullt consumes – TM chokes , a HeartBeat is missed  and
> the job fails.
>
>
>
> I was expecting Flink to handle this type of backpressure gracefully and
> not
>
>
>
> Note :  The mapper has not state to persist
>
> See below the Grafana charts,  on the left  is the TM HHEAP  Used.
>
> On the right – the ZMQ – out of flink. ( yellow ) Vs Flink consuming rate
> from reported by ZMQSOurce
>
> 1GB is the configured heap size
>
>
>
>


Flink behavior as a slow consumer - out of Heap MEM

2019-11-25 Thread Hanan Yehudai
HI ,  I am trying to do some performance test to my flink deployment.
I am implementing an extremely simplistic use case
I built a ZMQ Source

The topology is ZMQ Source - > Mapper- > DIscardingSInk ( a sink that does 
nothing )

Data is pushed via ZMQ at a very high rate.
When the incoming  rate from ZMQ is higher then the rate flink can keep up 
with,  I can see that the JVM Heap is filling up  ( using Flinks metrics ) .
when the heap is fullt consumes – TM chokes , a HeartBeat is missed  and the 
job fails.

I was expecting Flink to handle this type of backpressure gracefully and not

Note :  The mapper has not state to persist
See below the Grafana charts,  on the left  is the TM HHEAP  Used.
On the right – the ZMQ – out of flink. ( yellow ) Vs Flink consuming rate from 
reported by ZMQSOurce
1GB is the configured heap size

[cid:image001.jpg@01D5A422.7538D5C0]


Fwd: How to recover state from savepoint on embedded mode?

2019-11-25 Thread Reo Lei
-- Forwarded message -
发件人: Reo Lei 
Date: 2019年11月26日周二 上午9:53
Subject: Re: How to recover state from savepoint on embedded mode?
To: Yun Tang 


Hi Yun,
Thanks for your reply. what I say the embedded mode is the whole flink
cluster and job, include jobmanager, taskmanager and the job application
itself, running within a local JVM progress, which is use the "
LocalStreamEnvironment" within the job. And the start command look like
this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar
com.a.b.c.MyJob > /dev/null &"

why I am not use the standalnoe mode to run the job is because the running
env haven't zookeeper, and would not install the zookeeper. So I need to depend
on the embedded mode to run my job.

BR,
Reo

Yun Tang  于2019年11月26日周二 上午2:38写道:

> What is the embedded mode mean here? If you refer to SQL embedded mode,
> you cannot resume from savepoint now; if you refer to local standalone
> cluster, you could use `bin/flink run -s` to resume on a local cluster.
>
>
>
> Best
>
> Yun Tang
>
>
>
> *From: *Reo Lei 
> *Date: *Tuesday, November 26, 2019 at 12:37 AM
> *To: *"user@flink.apache.org" 
> *Subject: *How to recover state from savepoint on embedded mode?
>
>
>
> Hi,
>
> I have a job need running on embedded mode, but need to init some rule
> data from a database before start. So I used the State Processor API to
> construct my state data and save it to the local disk. When I want to used
> this savepoint to recover my job, I found resume a job from a savepoint
> need to use the command `bin/flink run -s :savepointPath *[*:runArgs]` to
> submit a job to flink cluster. That is mean the job is run on remote mode,
> not embedded mode.
>
>
>
> And I was wondering why I can't resume a job from a savepoint on embedded
> mode. If that is possible, what should I do?
>
> BTW, if we can not  resume a job from a savepoint on embedded mode, how to
> know the savepoint is constructed correctly in develop environment and use
> idea to debug it?
>
>
>
> BR,
>
> Reo
>
>
>


Re: Apache Flink - Troubleshooting exception while starting the job from savepoint

2019-11-25 Thread M Singh
 Hi Kostas/Congxian:
Thanks fo your response.  
Based on your feedback, I found that I had missed adding uid to one of the 
stateful operators and correcting that resolved the issue.  I still have 
stateless operators which I have no uid specified in the application.
So, I thought that adding uid was optional and if we don't add it and run 
another instance of the same app from a savepoint or checkpoint, it will pick 
up the state based on the generated uid.  Is that a correct understanding ?  
Also, if some stateful operators have uids but some don't, will it pick up the 
state for the operators with uid and the non-uid (using the generated uid) ones 
provided the application has not changed ?
Thanks again for your response.
Mans
On Monday, November 25, 2019, 09:24:42 AM EST, Congxian Qiu 
 wrote:  
 
 Hi
The problem is that the specified uid did not in the new job.1. As far as I 
know, the answer is yes. There are some operators have their own state(such as 
window state), could you please share the minimal code of your job?2.*truely* 
stateless operator do not need to have uid, but for the reason described in the 
above, assign uid to all operators is recommended.3. if the previous job is 
still there, I'm not sure we can find the operatorId in the UI easily, maybe 
other people can answer the question.4. for this purpose, maybe you can debug 
the savepoint meta with the new job locally, maybe 
CheckpointMetadataLoadingTest can help.5. for this problem, 1.9 is same as 1.6

Best,Congxian

Kostas Kloudas  于2019年11月25日周一 下午9:42写道:

As a side note, I am assuming that you are using the same Flink Job
before and after the savepoint and the same Flink version.
Am I correct?

Cheers,
Kostas

On Mon, Nov 25, 2019 at 2:40 PM Kostas Kloudas  wrote:
>
> Hi Singh,
>
> This behaviour is strange.
> One thing I can recommend to see if the two jobs are identical is to
> launch also the second job without a savepoint,
> just start from scratch, and simply look at the web interface to see
> if everything is there.
>
> Also could you please provide some code from your job, just to see if
> there is anything problematic with the application code?
> Normally there should be no problem with not providing UIDs for some
> stateless operators.
>
> Cheers,
> Kostas
>
> On Sat, Nov 23, 2019 at 11:16 AM M Singh  wrote:
> >
> >
> > Hey Folks:
> >
> > Please let me know how to resolve this issue since using 
> > --allowNonRestoredState without knowing if any state will be lost seems 
> > risky.
> >
> > Thanks
> > On Friday, November 22, 2019, 02:55:09 PM EST, M Singh 
> >  wrote:
> >
> >
> > Hi:
> >
> > I have a flink application in which some of the operators have uid and name 
> > and some stateless ones don't.
> >
> > I've taken a save point and tried to start another instance of the 
> > application from a savepoint - I get the following exception which 
> > indicates that the operator is not available to the new program even though 
> > the second job is the same as first but just running from the first jobs 
> > savepoint.
> >
> > Caused by: java.lang.IllegalStateException: Failed to rollback to 
> > checkpoint/savepoint 
> > s3://mybucket/state/savePoint/mysavepointfolder/66s4c6402d7532801287290436fa9fadd/savepoint-664c64-fa235d26d379.
> >  Cannot map checkpoint/savepoint state for operator 
> > d1a56c5a9ce5e3f1b03e01cac458bb4f to the new program, because the operator 
> > is not available in the new program. If you want to allow to skip this, you 
> > can set the --allowNonRestoredState option on the CLI.
> >
> > at 
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
> >
> > at 
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1102)
> >
> > at 
> > org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1219)
> >
> > at 
> > org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1143)
> >
> > at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
> >
> > at 
> > org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
> >
> > ... 10 more
> >
> >
> >
> > I've tried to start an application instance from the checkpoint too of the 
> > first instance but it gives the same exception indicating that the operator 
> > is not available.
> >
> > Questions:
> >
> > 1. If this a problem because some of the operators don't have uid ?
> > 2. Is it required to have uids even for stateless operators like simple map 
> > or filter operators ?
> > 3. Is there a way to find out which operator is not available in the new 
> > application even though I am running the same application ?
> > 4. Is there a way to figure out if this is the only missing operator or are 
> > there others whose mapping is missing for the second instance run ?
> > 5. Is this issue resolved in Apache Flink 1.9 (since I am still using Flink 
> > 1.6)

Re: Question about to modify operator state on Flink1.7 with State Processor API

2019-11-25 Thread vino yang
Hi Kaihao,

Ping @Aljoscha Krettek  @Tzu-Li (Gordon) Tai
 to give more professional suggestions.

What's more, we may need to give a statement about if the state processor
API can process the snapshots generated by the old version jobs.  WDYT?

Best,
Vino

Kaihao Zhao  于2019年11月25日周一 下午11:39写道:

> Hi,
>
> We are running Flink 1.7 and recently due to Kafka cluster migration, we
> need to find a way to modify kafka offset in FlinkKafkaConnector's state,
> and we found Flink 1.9's State Processor API is the exactly tool we need,
> we are able to modify the operator state via State Processor API, but when
> trying to resume App from the modified savepoint, we found it failed with
> ClassNotFoundException: *TupleSerializerSnapshot*, these
> *TypeSerializerSnapshots* are new in Flink 1.9 but not in 1.7, so I
> wonder if there has any suggestion or workaround to modify 1.7's state?
>
> --
> Thanks & Regards
> Zhao Kaihao
>


Streaming Files to S3

2019-11-25 Thread Li Peng
Hey folks, I'm trying to stream large volume data and write them as csv
files to S3, and one of the restrictions is to try and keep the files to
below 100MB (compressed) and write one file per minute. I wanted to verify
with you guys regarding my understanding of StreamingFileSink:

1. From the docs, StreamingFileSink will use multipart upload with s3, so
even with many workers writing to s3, it will still output only one file
for all of them for each time window, right?
2. StreamingFileSink.forRowFormat can be configured to write individual
rows and then commit to disk as per the above rules, by specifying a
RollingPolicy with the file size limit and the rollover interval, correct?
And the limit and the interval applies to the entire file, not to each part
file?
3. To write to s3, is it enough to just add flink-s3-fs-hadoop as a
dependency and specify the file path as "s3://file"?

Thanks,
Li


Re: Pre-process data before it hits the Source

2019-11-25 Thread vino yang
Hi Vijay,

IMO, the semantics of the source is not changeless. It can contain
integrate with third-party systems and consume events. However, it can also
contain more business logic about your data pre-process after consuming
events.

Maybe it needs some customization. WDYT?

Best,
Vino

Vijay Balakrishnan  于2019年11月26日周二 上午6:45写道:

> Hi,
> Need to pre-process data(transform incoming data to a different format)
> before it hits the Source I have defined. How can I do that ?
>
> I tried to use a .map on the DataStream but that is too late as the data
> has already hit the Source I defined.
> FlinkKinesisConsumer> kinesisConsumer =
> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
> socketTimeout);
> DataStreamSource> monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
>
> DataStream> kinesisStream1 = kinesisStream.map(new
> TransformFunction(...));//too late here
>
> TIA,
>


Re: Dynamically creating new Task Managers in YARN

2019-11-25 Thread Piper Piper
Hi Yang,

Session mode is working exactly as you described. No exceptions.

Thank you!

Piper


On Sun, Nov 24, 2019 at 11:24 PM Yang Wang  wrote:

> Hi Piper,
>
> In session mode, Flink will always use the free slots in the existing
> TaskManagers first.
> When it can not full fill the slot request, new TaskManagers will be
> started.
> Did you find some exceptions?
>
> Best,
> Yang
>
> Piper Piper  于2019年11月23日周六 上午8:52写道:
>
>> Hello Yang,
>>
>> Thank you for the explanation!
>>
>> I want to control the amount of TaskManagers in order to have finer
>> control over allowing/rejecting certain jobs in the cluster.
>>
>> In Session mode with multiple jobs, is there any way to control whether
>> Flink will fit a new job into empty slots in existing Task Managers versus
>> starting new TaskManagers for every new job?
>>
>> Thank you,
>>
>> Piper
>>
>> On Thu, Nov 21, 2019 at 10:53 PM Yang Wang  wrote:
>>
>>> Hi Piper,
>>>
>>> Jingsong is right. Both per-job and session cluster, the
>>> YarnResourceManager will allocate
>>> taskmanager containers dynamically on demand.
>>>
>>> For per-job cluster, it will allocate taskmanagers base on the job slot
>>> demand. The excess
>>> containers will return to yarn immediately. When the job finished,
>>> jobmanager and all
>>> taskmanagers will be released.
>>> For sesion-cluster, the YarnResourceManager will not have any
>>> taskmanagers on started.
>>> Once the job is submitted, it will allocate the taskmanagers. When the
>>> job finished, the
>>> taskmanagers will enter into idle and be released after the timeout. The
>>> jobmanager will
>>> be long-running unless manually stop the session.
>>>
>>> I'm just curious why do you want to control the amounts of taskmanagers.
>>> Because they are
>>> always allocated on demand.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Piper Piper  于2019年11月22日周五 上午11:02写道:
>>>
 Thank you, I will check it out.

 On Thu, Nov 21, 2019, 9:21 PM Jingsong Li 
 wrote:

> Hi Piper,
>
> AFAIK, There are no these flexible operations. You can get some
> information from metrics, but you can not control them.
> Maybe you should modify some source code in flink-yarn.
>
> Best,
> Jingsong Lee
>
>
> On Thu, Nov 21, 2019 at 8:17 PM Piper Piper 
> wrote:
>
>> Hi Jingsong,
>>
>> Thank you for your reply!
>>
>> >Is this what you want? Piper.
>>
>> Yes. This is exactly what I want.
>>
>> Is there any way for me to specify to Flink RM how much of resources
>> to ask YARN's RM for, and if we want Flink's RM to ask for resources
>> proactively before it runs out?
>> Similarly, is there any way I can force the JM to release TM back to
>> YARN before timeout?
>>
>> Or will I need to modify the source code of Flink for this?
>>
>> Thank you,
>>
>> Piper
>>
>> On Thu, Nov 21, 2019 at 2:17 AM vino yang 
>> wrote:
>>
>>> Hi Jingsong,
>>>
>>> Thanks for the explanation about the mechanism of the new Flink
>>> session cluster mode.
>>>
>>> Because I mostly use job cluster mode, so did not have a good
>>> knowledge of the new Flink session cluster mode.
>>>
>>> Best,
>>> Vino
>>>
>>> Jingsong Li  于2019年11月21日周四 下午2:46写道:
>>>
 Hi Piper and Vino:

 Current Flink version, the resources of Flink Session cluster
 are unrestricted, which means if the requested resources exceed the
 resources owned by the current session, it will apply to the RM of 
 yarn for
 new resources.
 And if TaskManager is idle for too long, JM will release it to
 yarn. This behavior is controlled by 
 resourcemanager.taskmanager-timeout .
 You can set a suitable value for it to enjoy the benefits of reuse 
 process
 and dynamic resources.

 From this point of view, I think session mode is a good choice.
 Is this what you want? Piper.

 Best,
 Jingsong Lee



 On Thu, Nov 21, 2019 at 2:25 PM vino yang 
 wrote:

> Hi Piper,
>
> The understanding of two deploy modes For Flink on Yarn is right.
>
> AFAIK, The single job (job cluster) mode is more popular than
> Session mode.
>
> Because job cluster mode, Flink let YARN manage resources as far
> as possible. And this mode can keep isolation from other jobs.
>
> IMO, we do not need to combine their advantages. Let YARN do the
> things that it is good at. What do you think?
>
> Best,
> Vino
>
>
> Piper Piper  于2019年11月21日周四 上午11:55写道:
>
>> Hi Vino,
>>
>> I want to implement Resource Elasticity. In doing so, I have read
>> that Flink with YARN has two modes: Job and Session.

Flink 1.9.1 KafkaConnector missing data (1M+ records)

2019-11-25 Thread Harrison Xu
Hello,

We're seeing some strange behavior with flink's KafkaConnector010 (Kafka
0.10.1.1) arbitrarily skipping data.

*Context*
KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter
(S3) as sink with no intermediate operators. Recently, we noticed that
millions of Kafka records were missing for *one* topic partition (this job
is running for 100+ topic partitions, and such behavior was only observed
for one). This job is run on YARN, and hosts were healthy with no hardware
faults observed. No exceptions in jobmanager or taskmanager logs at this
time.

*How was this detected?*
As a sanity check, we dual-write Kafka metadata (offsets) to a separate
location in S3, and have monitoring to ensure that written offsets are
contiguous with no duplicates.
Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3.

*(Condensed) Taskmanager logs*
2019-11-24 02:36:50,140 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252
with MPU ID 3XG...
2019-11-24 02:41:27,966 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253
with MPU ID 9MW...
2019-11-24 02:46:29,153 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254
with MPU ID 7AP...
2019-11-24 02:51:32,602 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255
with MPU ID xQU...

*2019-11-24 02:56:35,183 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256
with MPU ID pDL...*

*2019-11-24 03:01:26,059 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257
with MPU ID Itf...*
*2019-11-24 03:01:26,510 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263
with MPU ID e3l...*
2019-11-24 03:06:26,230 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264
with MPU ID 5z4...
2019-11-24 03:11:22,711 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265
with MPU ID NfP...

Two observations stand out from the above logs:
- Datetime *2019-11-24T01* and *2019-11-24T02* are entirely skipped,
resulting in millions of missing offsets. They are never written in future
commits (and data in S3 shows this).
- Two commits for the *same *topic partition ("digest_features", partition
4), happened nearly simultaneously on 2019-11-24 03:03, despite our commit
interval being set at 5 minutes. Why was the same TopicPartition read from
and committed twice in such a short interval?

Would greatly appreciate if anyone is able to shed light on this issue.
Happy to provide full logs if needed.
Thanks


Pre-process data before it hits the Source

2019-11-25 Thread Vijay Balakrishnan
Hi,
Need to pre-process data(transform incoming data to a different format)
before it hits the Source I have defined. How can I do that ?

I tried to use a .map on the DataStream but that is too late as the data
has already hit the Source I defined.
FlinkKinesisConsumer> kinesisConsumer =
getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
socketTimeout);
DataStreamSource> monitoringDataStreamSource =
env.addSource(kinesisConsumer);

DataStream> kinesisStream1 = kinesisStream.map(new
TransformFunction(...));//too late here

TIA,


Re: Metrics for Task States

2019-11-25 Thread Kelly Smith
Thanks Caizhi, that was what I was afraid of. Thanks for the information on the 
REST API 

It seems like the right solution would be to add it as a first-class feature 
for Flink so I will add a feature request. I may end up using the REST API as a 
workaround in the short-term - probably with a side-car container once we move 
to Kubernetes.

Kelly

From: Caizhi Weng 
Date: Monday, November 25, 2019 at 1:41 AM
To: Kelly Smith 
Cc: Piper Piper , "user@flink.apache.org" 

Subject: Re: Metrics for Task States

Hi Kelly,

As far as I know Flink currently does not have such metrics to monitor on the 
number of tasks in each states. See 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
 for the complete metrics list. (It seems that `taskSlotsAvailable` in the 
metrics list is the most related metrics).

But Flink has a REST api which can provide states for all the tasks 
(http://hostname:port/overview). This REST returns a json string containing all 
the metrics you want. Maybe you can write your own tool to monitor on this api.

If you really want to have metrics that describe the number of tasks in each 
states, you can open up a JIRA ticket at 
https://issues.apache.org/jira/projects/FLINK/issues/

Thank you

Kelly Smith mailto:kell...@zillowgroup.com>> 
于2019年11月25日周一 上午12:59写道:
With EMR/YARN, the cluster is definitely running in session mode. It exists 
independently of any job and continues running after the job exits.
Whether or not this is a bug in Flink, is it possible to get access to the 
metrics I'm asking about? Those would be useful even if this behavior is fixed.
Get Outlook for 
Android


From: Piper Piper mailto:piperfl...@gmail.com>>
Sent: Friday, November 22, 2019 9:10:41 PM
To: Kelly Smith mailto:kell...@zillowgroup.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Metrics for Task States

I am trying to reason why this problem should occur (i.e. why Flink could not 
reject the job when it required more slots than were available).

Flink in production on EMR (YARN): Does this mean Flink was being run in Job 
mode or Session mode?

Thank you,

Piper

On Thu, Nov 21, 2019 at 4:56 PM Piper Piper 
mailto:piperfl...@gmail.com>> wrote:
Thank you, Kelly!

On Thu, Nov 21, 2019 at 4:06 PM Kelly Smith 
mailto:kell...@zillowgroup.com>> wrote:

Hi Piper,



The repro is pretty simple:

  *   Submit a job with parallelism set higher than YARN has resources to 
support



What this ends up looking like in the Flink UI is this:
[cid:16ea1e8b5784cff311]



The Job is in a “RUNNING” state, but all of the tasks are in the “SCHEDULED” 
state. The `jobmanager.numRunningJobs` metric that Flink emits by default will 
increase by 1, but none of the tasks actually get scheduled on any TM.



[cid:16ea1e8b5785b16b22]



What I’m looking for is a way to detect when I am in this state using Flink 
metrics (ideally the count of tasks in each state for better observability).



Does that make sense?



Thanks,

Kelly



From: Piper Piper mailto:piperfl...@gmail.com>>
Date: Thursday, November 21, 2019 at 12:59 PM
To: Kelly Smith mailto:kell...@zillowgroup.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Metrics for Task States



Hello Kelly,



I thought that Flink scheduler only starts a job if all requested 
containers/TMs are available and allotted to that job.



How can I reproduce your issue on Flink with YARN?



Thank you,



Piper





On Thu, Nov 21, 2019, 1:48 PM Kelly Smith 
mailto:kell...@zillowgroup.com>> wrote:

I’ve been running Flink in production on EMR (YARN) for some time and have 
found the metrics system to be quite useful, but there is one specific case 
where I’m missing a signal for this scenario:



  *   When a job has been submitted, but YARN does not have enough resources to 
provide



Observed:

  *   Job is in RUNNING state
  *   All of the tasks for the job are in the (I believe) DEPLOYING state



Is there a way to 

Re: Completed job wasn't saved to archive

2019-11-25 Thread Chesnay Schepler
I'm afraid I can't think of a solution. I don't see a way how this 
operation can succeed or fail without anything being logged.


Is the cluster behaving normally afterwards? Could you check whether the 
numRunningJobs ticks down properly after the job was canceled?



On 22/11/2019 13:27, Pavel Potseluev wrote:

Hi Chesnay,
We archive jobs on s3 file system. We don't configure a throttling for 
write operations and afaik it isn't possible now and will be 
implemented in FLINK-13251 
. And other write 
operations (like checkpoints saving) work fine. But I don't see 
archived job or message about archiving failure at all. It looks like 
Flink just didn't try to save job to archive.

21.11.2019, 17:17, "Chesnay Schepler" :

If the archiving fails there should be some log message, like
"Failed to archive job" or "Could not archive completed job..." .
If nothing of the sort is logged my first instinct would be that
the operation is being slowed down, _a lot_.
Where are you archiving them to? Could it be the write operation
is being throttled heavily?
On 21/11/2019 13:48, Pavel Potseluev wrote:

Hi Vino,
Usually Flink archives jobs correctly and the problem is
rarely reproduced. So I think it isn't a problem with
configuration.
Job Manager log when job 5ec264a20bb5005cdbd8e23a5e59f136 was
canceled:

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18
18:52:13.294 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 1872 @ 1574092333218 for job
5ec264a20bb5005cdbd8e23a5e59f136.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18
18:52:37.260 [flink-akka.actor.default-dispatcher-30] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 1872 for job
5ec264a20bb5005cdbd8e23a5e59f136 (568048140 bytes in 23541
ms).

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18
18:53:13.314 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 1873 @ 1574092393218 for job
5ec264a20bb5005cdbd8e23a5e59f136.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18
18:53:19.279 [flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
Job bureau-user-offers-statistics-AUTORU-USERS_AUTORU
(5ec264a20bb5005cdbd8e23a5e59f136) switched from state
RUNNING to CANCELLING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18
18:53:19.279 [flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
Source: Custom File Source (1/1)
(934d89cf3d7999b40225dd8009b5493c) switched from RUNNING
to CANCELING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18
18:53:19.280 [flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
Source: kafka-source-moderation-update-journal-autoru ->
Filter -> Flat Map (1/2)
(47656a3c4fc70e19622acca31267e41f) switched from RUNNING
to CANCELING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18
18:53:19.280 [flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
Source: kafka-source-moderation-update-journal-autoru ->
Filter -> Flat Map (2/2)
(be3c4562e65d3d6bdfda4f1632017c6c) switched from RUNNING
to CANCELING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18
18:53:19.280 [flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
user-offers-statistics-init-from-file -> Map (1/2)
(4a45ed43b05e4d444e190a44b33514ac) switched from RUNNING
to CANCELING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18
18:53:19.280 [flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
user-offers-statistics-init-from-file -> Map (2/2)
(bb3be311c5e53abaedb06b4d0148c23f) switched from RUNNING
to CANCELING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18
18:53:19.280 [flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
Keyed Reduce -> Map -> Sink: user-offers-statistics-autoru
(1/2) (cfb291033df3f19c9745a6f2fd25e037) switched from
RUNNING to CANCELING.


Re: How to recover state from savepoint on embedded mode?

2019-11-25 Thread Yun Tang
What is the embedded mode mean here? If you refer to SQL embedded mode, you 
cannot resume from savepoint now; if you refer to local standalone cluster, you 
could use `bin/flink run -s` to resume on a local cluster.

Best
Yun Tang

From: Reo Lei 
Date: Tuesday, November 26, 2019 at 12:37 AM
To: "user@flink.apache.org" 
Subject: How to recover state from savepoint on embedded mode?

Hi,
I have a job need running on embedded mode, but need to init some rule data 
from a database before start. So I used the State Processor API to construct my 
state data and save it to the local disk. When I want to used this savepoint to 
recover my job, I found resume a job from a savepoint need to use the command 
`bin/flink run -s :savepointPath [:runArgs]` to submit a job to flink cluster. 
That is mean the job is run on remote mode, not embedded mode.

And I was wondering why I can't resume a job from a savepoint on embedded mode. 
If that is possible, what should I do?
BTW, if we can not  resume a job from a savepoint on embedded mode, how to know 
the savepoint is constructed correctly in develop environment and use idea to 
debug it?

BR,
Reo



How to recover state from savepoint on embedded mode?

2019-11-25 Thread Reo Lei
Hi,
I have a job need running on embedded mode, but need to init some rule data
from a database before start. So I used the State Processor API to
construct my state data and save it to the local disk. When I want to used
this savepoint to recover my job, I found resume a job from a savepoint
need to use the command `bin/flink run -s :savepointPath [:runArgs]` to
submit a job to flink cluster. That is mean the job is run on remote mode,
not embedded mode.

And I was wondering why I can't resume a job from a savepoint on embedded
mode. If that is possible, what should I do?
BTW, if we can not  resume a job from a savepoint on embedded mode, how to
know the savepoint is constructed correctly in develop environment and use
idea to debug it?

BR,
Reo


Re: How to submit two jobs sequentially and view their outputs in .out file?

2019-11-25 Thread Piotr Nowojski
Hi,

I would suggest the same thing as Vino did: it might be possible to use stdout 
somehow, but it’s a better idea to coordinate in some other way. Produce some 
(side?) output with a control message from one job once it finishes, that will 
control the second job.

Piotrek

> On 25 Nov 2019, at 09:38, vino yang  wrote:
> 
> Hi Komal,
> 
> > Thank you! That's exactly what's happening. Is there any way to force it 
> > write to a specific .out of a TaskManager?
> 
> No, I am curious why the two jobs depend on stdout? Can we introduce another 
> coordinator other than stdout? IMO, this mechanism is not always available.
> 
> Best,
> Vino
> 
> Komal Mariam mailto:komal.mar...@gmail.com>> 
> 于2019年11月25日周一 上午10:46写道:
> Hi Theo,
> 
> I want to interrupt/cancel my current job as it has produced the desired 
> results even though it runs infinitely,  and the next one requires full 
> resources. 
> 
> Due to some technical issue we cannot access the web UI so just working with 
> the CLI, for now. 
> 
> I found a less crude way by running the command ./bin/flink cancel   
> specified by the commands listed here: 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html 
> 
> 
> 
> 
> Hello Vino,
> 
> Thank you! That's exactly what's happening. Is there any way to force it 
> write to a specific .out of a TaskManager?
> 
> 
> Best Regards,
> Komal
> 
> 
> 
> On Mon, 25 Nov 2019 at 11:10, vino yang  > wrote:
> Hi Komal,
> 
> Since you use the Flink standalone deployment mode, the tasks of the jobs 
> which print information to the STDOUT may randomly deploy in any task manager 
> of the cluster. Did you check other Task Managers out file?
> 
> Best,
> Vino
> 
> Komal Mariam mailto:komal.mar...@gmail.com>> 
> 于2019年11月22日周五 下午6:59写道:
> Dear all,
> 
> Thank you for your help regarding my previous queries. Unfortunately, I'm 
> stuck with another one and will really appreciate your input. 
> 
> I can't seem to produce any outputs in "flink-taskexecutor-0.out" from my 
> second job after submitting the first one in my 3-node-flink standalone 
> cluster.
> 
> Say I want to test out two jobs sequentially. (I do not want to run them 
> concurrently/in parallel).
> 
> After submitting "job1.jar " via command line, I press "Ctrl + C" to exit 
> from it (as it runs infinitely). After that I 
> try to submit a second jar file having the same properties (group-id, topic, 
> etc) with the only difference being the query written in main function.
> 
> The first job produces relevant outputs in "flink-taskexecutor-0.out" but the 
> second one doesn't.
> 
> The only way I can see the output produced is if I restart the cluster after 
> job1 and then submit job2 as it produces another .out file.
> 
> But I want to submit 2 jobs sequentially and see their outputs without having 
> to restart my cluster. Is there any way to do this?
> 
> Additional info:
> For both jobs I'm using DataStream API and I have set: 
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> Best Regards,
> Komal



Question about to modify operator state on Flink1.7 with State Processor API

2019-11-25 Thread Kaihao Zhao
Hi,

We are running Flink 1.7 and recently due to Kafka cluster migration, we
need to find a way to modify kafka offset in FlinkKafkaConnector's state,
and we found Flink 1.9's State Processor API is the exactly tool we need,
we are able to modify the operator state via State Processor API, but when
trying to resume App from the modified savepoint, we found it failed with
ClassNotFoundException: *TupleSerializerSnapshot*, these
*TypeSerializerSnapshots* are new in Flink 1.9 but not in 1.7, so I wonder
if there has any suggestion or workaround to modify 1.7's state?

-- 
Thanks & Regards
Zhao Kaihao


Re: Apache Flink - Troubleshooting exception while starting the job from savepoint

2019-11-25 Thread Congxian Qiu
Hi

The problem is that the specified uid did not in the new job.
1. As far as I know, the answer is yes. There are some operators have their
own state(such as window state), could you please share the minimal code of
your job?
2.*truely* stateless operator do not need to have uid, but for the reason
described in the above, assign uid to all operators is recommended.
3. if the previous job is still there, I'm not sure we can find the
operatorId in the UI easily, maybe other people can answer the question.
4. for this purpose, maybe you can debug the savepoint meta with the new
job locally, maybe CheckpointMetadataLoadingTest can help.
5. for this problem, 1.9 is same as 1.6


Best,
Congxian


Kostas Kloudas  于2019年11月25日周一 下午9:42写道:

> As a side note, I am assuming that you are using the same Flink Job
> before and after the savepoint and the same Flink version.
> Am I correct?
>
> Cheers,
> Kostas
>
> On Mon, Nov 25, 2019 at 2:40 PM Kostas Kloudas  wrote:
> >
> > Hi Singh,
> >
> > This behaviour is strange.
> > One thing I can recommend to see if the two jobs are identical is to
> > launch also the second job without a savepoint,
> > just start from scratch, and simply look at the web interface to see
> > if everything is there.
> >
> > Also could you please provide some code from your job, just to see if
> > there is anything problematic with the application code?
> > Normally there should be no problem with not providing UIDs for some
> > stateless operators.
> >
> > Cheers,
> > Kostas
> >
> > On Sat, Nov 23, 2019 at 11:16 AM M Singh  wrote:
> > >
> > >
> > > Hey Folks:
> > >
> > > Please let me know how to resolve this issue since using
> --allowNonRestoredState without knowing if any state will be lost seems
> risky.
> > >
> > > Thanks
> > > On Friday, November 22, 2019, 02:55:09 PM EST, M Singh <
> mans2si...@yahoo.com> wrote:
> > >
> > >
> > > Hi:
> > >
> > > I have a flink application in which some of the operators have uid and
> name and some stateless ones don't.
> > >
> > > I've taken a save point and tried to start another instance of the
> application from a savepoint - I get the following exception which
> indicates that the operator is not available to the new program even though
> the second job is the same as first but just running from the first jobs
> savepoint.
> > >
> > > Caused by: java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint
> s3://mybucket/state/savePoint/mysavepointfolder/66s4c6402d7532801287290436fa9fadd/savepoint-664c64-fa235d26d379.
> Cannot map checkpoint/savepoint state for operator
> d1a56c5a9ce5e3f1b03e01cac458bb4f to the new program, because the operator
> is not available in the new program. If you want to allow to skip this, you
> can set the --allowNonRestoredState option on the CLI.
> > >
> > > at
> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
> > >
> > > at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1102)
> > >
> > > at
> org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1219)
> > >
> > > at
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1143)
> > >
> > > at
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
> > >
> > > at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
> > >
> > > ... 10 more
> > >
> > >
> > >
> > > I've tried to start an application instance from the checkpoint too of
> the first instance but it gives the same exception indicating that the
> operator is not available.
> > >
> > > Questions:
> > >
> > > 1. If this a problem because some of the operators don't have uid ?
> > > 2. Is it required to have uids even for stateless operators like
> simple map or filter operators ?
> > > 3. Is there a way to find out which operator is not available in the
> new application even though I am running the same application ?
> > > 4. Is there a way to figure out if this is the only missing operator
> or are there others whose mapping is missing for the second instance run ?
> > > 5. Is this issue resolved in Apache Flink 1.9 (since I am still using
> Flink 1.6)
> > >
> > > If there any additional pointers please let me know.
> > >
> > > Thanks
> > >
> > > Mans
> > >
> > >
>


Re: Window-join DataStream (or KeyedStream) with a broadcast stream

2019-11-25 Thread Piotr Nowojski
Hi,

So you are trying to use the same window definition, but you want to aggregate 
the data in two different ways:

1. keyBy(userId)
2. Global aggregation

Do you want to use exactly the same aggregation functions? If not, you can just 
process the events twice:


DataStream<…> events = …;

DataStream<….> keyedEvents = events
.keyBy(…)
.window(…)
.process(f) // instead of process this can be whatever you want, 
aggregate/apply/reduce/...
DataStream<….> nonKeyedEvents = events
.windowAll(…)
.process(g)


From here you can process keyedEvents and nonKeyedEvents as you prefer.

If yes, if both global and non global aggregation are using similar/the same 
aggregation function, you could try to use `keyedEvents` as pre-aggregated 
input for `.windowAll(…)`. 


DataStream<….> keyedEvents = events.keyBy(…).window(…).process(f) 

keyedEvents.print() // or further process keyedEvents
DataStream<….> nonKeyedEvents = keyedEvents.windowAll(…).process(f')


But this assumes that the output of your `process(f)` can be re-processed. This 
second approach can minimise amount of work to be done by the global 
aggregation. In the first approach, all of the records will have to be 
processed by a single operator (global aggregation), which can be a performance 
bottleneck.

Piotrek

> On 24 Nov 2019, at 14:20, natasky  wrote:
> 
> Hi all,
> 
> I use window aggregation to create a stream of aggregated data per user, per
> some interval.
> 
> Additionally, I use same windows to aggregate system-wide data per the same
> interval.
> 
> I.e.:
> 
> Per user stream: events keyed by user ID -> tumbling window -> aggregation
> 
> System wide stream: events -> tumbling window (windowAll) -> aggregation
> 
> I need to produce a value per user, per interval, that depends on the
> aggregated
> data from that user and the system wide data aggregated for the
> corresponding
> interval.
> 
> I couldn't find a way to acheive this with Flink's windows. I think can I
> get
> it to work with broadcast, connect and CoProcessFunction - is that the way
> to
> go? How would I handle late events that way?
> 
> Thanks!
> - Nathan
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Flink distributed runtime architucture

2019-11-25 Thread Piotr Nowojski
Hi,

I’m glad to hear that you are interested in Flink! :)

>  In the picture, keyBy window and apply operators share the same circle. Is 
> is because these operators are chaining together? 

It’s not as much about chaining, as the chain of DataStream API invocations 
`someStream.keyBy(…).window(…).apply(…)` creates a single logical operations - 
each one of them on it’s own doesn’t make sense, but together they define how a 
single `WindowOperator` should behave (`keyBy` additionally defines how records 
should be shuffle).

Chaining happens _usually_ between “shuffling” boundaries. So for example:

someStream
.map(…) // first chain …
.filter(…) // … still first chain
.keyBy(…) // operator chain boundary
.window(…).apply(…) // beginning of 2nd chain
.map(…) // 2nd chain
.filter(…) // still 2nd chain …
.keyBy(…) // operator chain boundary
(…)

>  the repartition process happens both inside TaskManager and between 
> TaskManager. Inside TaskManger, the data transmit overload maybe just in 
> memory.

Yes, exactly. Data transfer happens in-memory, however records are still being 
serialised and deserialised bot “local input channels” (that’s how we call 
communication between operators inside a single TaskManager).

> Between TaskManager, the data transmit overload may inter-process or 
> inter-container, depending on how I deploy the Flink cluster. Is my 
> understanding right? 

Yes, between TaskManagers network sockets are always used, regardless if they 
are happening on one physical machine (localhost) or not.

> These details may highly related to Actor model? As I have little knowledge 
> of Actor model.

I’m not sure if I fully understand your questions. Flink is not using Actor 
model for the data pipelines.

I hope that helps :)

Piotrek

> On 24 Nov 2019, at 07:35, Lu Weizheng  wrote:
> 
> Hi all,
> 
> I have been paying attention on Flink for about half a year and have read 
> official documents several times. I have already got a comprehensive 
> understanding of Flink distributed runtime architecture, but still have some 
> questions that need to be clarify.
> 
> 
> 
> On Flink documents website, this picture shows the dataflow model of Flink. 
> In the picture, keyBy window and apply operators share the same circle. Is is 
> because these operators are chaining together? 
> 
> 
> 
> In the parallelized view, data stream is partition into multiple partitions. 
> Each partition is a subset of source data. Repartition happens when we use 
> keyBy operator. If these tasks share task slots and run like picture above, 
> the repartition process happens both inside TaskManager and between 
> TaskManager. Inside TaskManger, the data transmit overload maybe just in 
> memory. Between TaskManager, the data transmit overload may inter-process or 
> inter-container, depending on how I deploy the Flink cluster. Is my 
> understanding right? These details may highly related to Actor model? As I 
> have little knowledge of Actor model.
> 
> This is my first time to use Flink maillist. Thank you so much if anyone can 
> explain it.
> 
> Weizheng



Re: Apache Flink - Troubleshooting exception while starting the job from savepoint

2019-11-25 Thread Kostas Kloudas
As a side note, I am assuming that you are using the same Flink Job
before and after the savepoint and the same Flink version.
Am I correct?

Cheers,
Kostas

On Mon, Nov 25, 2019 at 2:40 PM Kostas Kloudas  wrote:
>
> Hi Singh,
>
> This behaviour is strange.
> One thing I can recommend to see if the two jobs are identical is to
> launch also the second job without a savepoint,
> just start from scratch, and simply look at the web interface to see
> if everything is there.
>
> Also could you please provide some code from your job, just to see if
> there is anything problematic with the application code?
> Normally there should be no problem with not providing UIDs for some
> stateless operators.
>
> Cheers,
> Kostas
>
> On Sat, Nov 23, 2019 at 11:16 AM M Singh  wrote:
> >
> >
> > Hey Folks:
> >
> > Please let me know how to resolve this issue since using 
> > --allowNonRestoredState without knowing if any state will be lost seems 
> > risky.
> >
> > Thanks
> > On Friday, November 22, 2019, 02:55:09 PM EST, M Singh 
> >  wrote:
> >
> >
> > Hi:
> >
> > I have a flink application in which some of the operators have uid and name 
> > and some stateless ones don't.
> >
> > I've taken a save point and tried to start another instance of the 
> > application from a savepoint - I get the following exception which 
> > indicates that the operator is not available to the new program even though 
> > the second job is the same as first but just running from the first jobs 
> > savepoint.
> >
> > Caused by: java.lang.IllegalStateException: Failed to rollback to 
> > checkpoint/savepoint 
> > s3://mybucket/state/savePoint/mysavepointfolder/66s4c6402d7532801287290436fa9fadd/savepoint-664c64-fa235d26d379.
> >  Cannot map checkpoint/savepoint state for operator 
> > d1a56c5a9ce5e3f1b03e01cac458bb4f to the new program, because the operator 
> > is not available in the new program. If you want to allow to skip this, you 
> > can set the --allowNonRestoredState option on the CLI.
> >
> > at 
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
> >
> > at 
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1102)
> >
> > at 
> > org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1219)
> >
> > at 
> > org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1143)
> >
> > at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
> >
> > at 
> > org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
> >
> > ... 10 more
> >
> >
> >
> > I've tried to start an application instance from the checkpoint too of the 
> > first instance but it gives the same exception indicating that the operator 
> > is not available.
> >
> > Questions:
> >
> > 1. If this a problem because some of the operators don't have uid ?
> > 2. Is it required to have uids even for stateless operators like simple map 
> > or filter operators ?
> > 3. Is there a way to find out which operator is not available in the new 
> > application even though I am running the same application ?
> > 4. Is there a way to figure out if this is the only missing operator or are 
> > there others whose mapping is missing for the second instance run ?
> > 5. Is this issue resolved in Apache Flink 1.9 (since I am still using Flink 
> > 1.6)
> >
> > If there any additional pointers please let me know.
> >
> > Thanks
> >
> > Mans
> >
> >


Re: Apache Flink - Troubleshooting exception while starting the job from savepoint

2019-11-25 Thread Kostas Kloudas
Hi Singh,

This behaviour is strange.
One thing I can recommend to see if the two jobs are identical is to
launch also the second job without a savepoint,
just start from scratch, and simply look at the web interface to see
if everything is there.

Also could you please provide some code from your job, just to see if
there is anything problematic with the application code?
Normally there should be no problem with not providing UIDs for some
stateless operators.

Cheers,
Kostas

On Sat, Nov 23, 2019 at 11:16 AM M Singh  wrote:
>
>
> Hey Folks:
>
> Please let me know how to resolve this issue since using 
> --allowNonRestoredState without knowing if any state will be lost seems risky.
>
> Thanks
> On Friday, November 22, 2019, 02:55:09 PM EST, M Singh  
> wrote:
>
>
> Hi:
>
> I have a flink application in which some of the operators have uid and name 
> and some stateless ones don't.
>
> I've taken a save point and tried to start another instance of the 
> application from a savepoint - I get the following exception which indicates 
> that the operator is not available to the new program even though the second 
> job is the same as first but just running from the first jobs savepoint.
>
> Caused by: java.lang.IllegalStateException: Failed to rollback to 
> checkpoint/savepoint 
> s3://mybucket/state/savePoint/mysavepointfolder/66s4c6402d7532801287290436fa9fadd/savepoint-664c64-fa235d26d379.
>  Cannot map checkpoint/savepoint state for operator 
> d1a56c5a9ce5e3f1b03e01cac458bb4f to the new program, because the operator is 
> not available in the new program. If you want to allow to skip this, you can 
> set the --allowNonRestoredState option on the CLI.
>
> at 
> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
>
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1102)
>
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1219)
>
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1143)
>
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
>
> at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
>
> ... 10 more
>
>
>
> I've tried to start an application instance from the checkpoint too of the 
> first instance but it gives the same exception indicating that the operator 
> is not available.
>
> Questions:
>
> 1. If this a problem because some of the operators don't have uid ?
> 2. Is it required to have uids even for stateless operators like simple map 
> or filter operators ?
> 3. Is there a way to find out which operator is not available in the new 
> application even though I am running the same application ?
> 4. Is there a way to figure out if this is the only missing operator or are 
> there others whose mapping is missing for the second instance run ?
> 5. Is this issue resolved in Apache Flink 1.9 (since I am still using Flink 
> 1.6)
>
> If there any additional pointers please let me know.
>
> Thanks
>
> Mans
>
>


Re: Per Operator State Monitoring

2019-11-25 Thread Piotr Nowojski
Hi,

I’m not sure if there is some simple way of doing that (maybe some other 
contributors will know more).

There are two potential ideas worth exploring:
- use periodically triggered save points for monitoring? If I remember 
correctly save points are never incremental
- use save point input/output format to analyse the content of the save point? 
[1]

I hope that someone else from the community will be able to help more here.

Piotrek

[1] https://flink.apache.org/feature/2019/09/13/state-processor-api.html 


> On 22 Nov 2019, at 22:48, Aaron Langford  wrote:
> 
> Hey Flink Community,
> 
> I'm working on a Flink application where we are implementing operators that 
> extend the RichFlatMap and RichCoFlatMap interfaces. As a result, we are 
> working directly with Flink's state API (ValueState, ListState, MapState). 
> Something that appears to be extremely valuable is having a way to monitor 
> the state size for each operator. My team has already run into a few cases 
> where our state has exploded and jobs fail because YARN kills containers who 
> are exceeding their memory limits.
> 
> It is my understanding that the way to best monitor this kind of thing by 
> watching checkpoint size per operator instance. This gets a little confusing 
> when doing incremental check-pointing because the numbers reported seem to be 
> a delta in state size, not the actual state size at that point in time. For 
> my teams application, the total state size is not the sum of those deltas. 
> What is the best way to get the total size of a checkpoint per operator for 
> each checkpoint?
> 
> Additionally, monitoring de-serializing and serializing state in a Flink 
> application is something that I haven't seen a great story for yet. It seems 
> that some of the really badly written Flink operators tend to do most poorly 
> when they demand lots of serde for each record. So giving visibility into how 
> well an application is handling these types of operations seems to be a 
> valuable guard rail for flink developers. Does anyone have existing solutions 
> for this, or are there pointers to some work that can be done to improve this 
> story?
> 
> Aaron



Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-25 Thread Piotr Nowojski
Hi,

Good to hear that you were able to workaround the problem.

I’m not sure what’s the exact reason why mmaped partitions caused those 
failures, but you are probably right that they have caused some memory 
exhaustion. Probably this memory is not capped by anything, but I would expect 
kernel to release it instead of killing the container, unless this was not 
kernel’s OOM killer that killed the container. Is that what’s happening? Yarn 
is keeping track of the used memory and Flink’s mmap files unbounded usage of 
mmap files caused to exceed this limit?

I’ve asked some colleagues to take a look here, but most of them are busy this 
week with Flink Forward Asia, so they might not respond immediately.

Piotrek

> On 22 Nov 2019, at 14:51, Hailu, Andreas  wrote:
> 
> Zhijiang, Piotr, we made this change and it solved our mmap usage problem, so 
> we can move forward in our testing. Thanks.
>  
> I’m curious – if I’m understanding this change in 1.9 correctly, blocking 
> result partitions were being written to mmap which in turn resulted in 
> exhausting container memory? This is why we were seeing failures in our 
> pipelines which had operators which fed into a CoGroup?
>  
> // ah
>  
>  <>From: Zhijiang  
> Sent: Thursday, November 21, 2019 9:48 PM
> To: Hailu, Andreas [Engineering] ; Piotr 
> Nowojski 
> Cc: user@flink.apache.org
> Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
>  
> The hint of mmap usage below is really helpful to locate this problem. I 
> forgot this biggest change for batch job in release-1.9.
> The blocking type option can be set to `file` as Piotr suggested to behave 
> similar as before. I think it can solve your problem. 
>  
> --
> From:Hailu, Andreas mailto:andreas.ha...@gs.com>>
> Send Time:2019 Nov. 21 (Thu.) 23:37
> To:Piotr Nowojski mailto:pi...@ververica.com>>
> Cc:Zhijiang mailto:wangzhijiang...@aliyun.com>>; 
> user@flink.apache.org   >
> Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
>  
> Thanks, Piotr. We’ll rerun our apps today with this and get back to you.
>  
> // ah
>  
> From: Piotr Nowojski  > On Behalf Of Piotr Nowojski
> Sent: Thursday, November 21, 2019 10:14 AM
> To: Hailu, Andreas [Engineering]  >
> Cc: Zhijiang  >; user@flink.apache.org 
> 
> Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
>  
> Hi,
>  
> I would suspect this:
> https://issues.apache.org/jira/browse/FLINK-12070 
> 
> To be the source of the problems.
>  
> There seems to be a hidden configuration option that avoids using memory 
> mapped files:
>  
> taskmanager.network.bounded-blocking-subpartition-type: file
>  
> Could you test if helps?
>  
> Piotrek
>  
> 
> On 21 Nov 2019, at 15:22, Hailu, Andreas  > wrote:
>  
> Hi Zhijiang,
>  
> I looked into the container logs for the failure, and didn’t see any specific 
> OutOfMemory errors before it was killed. I ran the application using the same 
> config this morning on 1.6.4, and it went through successfully. I took a 
> snapshot of the memory usage from the dashboard and can send it to you if you 
> like for reference.
>  
> What stands out to me as suspicious is that on 1.9.1, the application is 
> using nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 
> throughout its runtime and succeeds. The JVM heap memory itself never exceeds 
> its capacity, peaking at 6.65GB, so it sounds like the problem lies somewhere 
> in the changes around mapped memory.
>  
> // ah
>  
> From: Zhijiang  > 
> Sent: Wednesday, November 20, 2019 11:32 PM
> To: Hailu, Andreas [Engineering]  >; user@flink.apache.org 
> 
> Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
>  
> Hi Andreas,
>  
> You are running a batch job, so there should be no native memory used by 
> rocked state backend. Then I guess it is either heap memory or direct memory 
> over used. The heap managed memory is mainly used by batch operators and 
> direct memory is used by network shuffle. Can you further check whether there 
> are any logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? 
> If the used memory exceeds the JVM configuration, it should throw that error. 
> Then we can further narrow down the scope. I can not remember the changes of 
> memory issues for managed memory or 

Re: DataSet API: HBase ScannerTimeoutException and double Result processing

2019-11-25 Thread OpenInx
> If the call to mapResultToOutType(Result) finished without an error there
is no need to restart from the same row.
> The new scanner should start from the next row.
> Is that so or am I missing something?

Yeah, your are right. I've filed the issue
https://issues.apache.org/jira/browse/FLINK-14941 to address this bug.
Thanks.


On Mon, Nov 25, 2019 at 6:57 PM Mark Davis  wrote:

> Hi Flavio,
>
>
> When the resultScanner dies because of a timeout (this happens a lot when
>> you have backpressure and the time between 2 consecutive reads exceed the
>> scanner timeout), the code creates a new scanner and restart from where it
>> was (starRow = currentRow).
>> So there should not be any duplicates (in theory), but this could be the
>> root of the problem..
>>
>
> Yes, you are right, the nextRecord() exception handling is responsible for
> the duplicate record processing:
>
> org.apache.hadoop.hbase.client.ScannerTimeoutException: 1038878ms passed
> since the last invocation, timeout is currently set to 6
> at
> org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:453)
> at
> org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:371)
> at
> org.apache.flink.addons.hbase.AbstractTableInputFormat.nextRecord(AbstractTableInputFormat.java:130)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hadoop.hbase.UnknownScannerException:
> org.apache.hadoop.hbase.UnknownScannerException: Name: 135281, already
> closed?
> at
> org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2389)
> at
> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
> at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
> at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
> at
> org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
> at
> org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)
>
> But I am not sure that the handling of the HBase exception thrown from
> ClientScanner.next() is correct.
> If the call to mapResultToOutType(Result) finished without an error there
> is no need to restart from the same row.
> The new scanner should start from the next row.
> Is that so or am I missing something?
>
> Best regards,
>   Mark
>
>


Re: Apache Flink - Uid and name for Flink sources and sinks

2019-11-25 Thread M Singh
 
Thanks DIan for your pointers.  MansOn Sunday, November 24, 2019, 08:57:53 
PM EST, Dian Fu  wrote:  
 
 Hi Mans,
Please see my reply inline below.


在 2019年11月25日,上午5:42,M Singh  写道:
 Thanks Dian for your answers.
A few more questions:
1. If I do not assign uids to operators/sources and sinks - I am assuming the 
framework assigns it one.  Now how does another run of the the same application 
using the previous runs savepoint/checkpoint match it's tasks/operators to the 
savepoint/checkpoint state of the application ? 

You are right that the framework will generate an uid for an operator if it's 
not assigned. The uid is generated in a deterministic way to ensure that the 
uid for the same operator remains the same as previous runs(under certain 
conditions). 
The uid generation 
algorithm:https://github.com/apache/flink/blob/fd511c345eac31f03b801ff19dbf1f8c86aae760/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L78


2. Is the operatorID in the checkpoint state the same as uid ?  

3. Do you have any pointer as to how an operatorID is generated for the 
checkpoint and who can it be mapped to back to the operator for troubleshooting 
purposes ?

The OperatorID is constructed from the uid and they are the 
same:https://github.com/apache/flink/blob/66b979efc7786edb1a207339b8670d0e82c459a7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L292


Regarding id attribute - I meant the following:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L139


However, I realized that this is not unique across applications runs and so not 
a good candidate.
Thanks again for your help.





On Sunday, November 24, 2019, 04:55:55 AM EST, Dian Fu 
 wrote:  
 
 1. Should we assign uid and name to the sources and sinks too ?  
>> If the sources/sinks have used state, you should assign uid for them. This 
>> is usually true for sources. 

2. What are the pros and cons of adding uid to sources and sinks ?
>> I'm not seeing the cons for assigning uid to sources and sinks. So I guess 
>> assigning the uids for sources/sinks is always a good practice.

3. The sinks have uid and hashUid - which is the preferred attribute to use  
for allowing job restarts ?
>> Could you see if this could answer you question: 
>> https://stackoverflow.com/questions/46112142/apache-flink-set-operator-uid-vs-uidhash

4. If sink and sources uid are not provided in the application, can they still 
maintain state across job restarts from checkpoints ?>> It depends on whether 
the sources/sinks uses state. I think most sources use state to maintaining the 
read offset.  5. Can the sinks and sources without uid restart from savepoints ?
>> The same as above.

6. The data streams have an attribute id -  How is this generated and can this 
be used for creating a uid for the sink ?  
>> Not sure what do you mean by "attribute id". Could you give some more 
>> detailed information about it?

Regards,
Dian
On Fri, Nov 22, 2019 at 6:27 PM M Singh  wrote:

 
Hi Folks - Please let me know if you have any advice on the best practices for 
setting uid for sources and sinks.  Thanks.  MansOn Thursday, November 21, 
2019, 10:10:49 PM EST, M Singh  wrote:  
 
 Hi Folks:
I am assigning uid and name for all stateful processors in our application and 
wanted to find out the following:
1. Should we assign uid and name to the sources and sinks too ?  2. What are 
the pros and cons of adding uid to sources and sinks ?3. The sinks have uid and 
hashUid - which is the preferred attribute to use  for allowing job restarts 
?4. If sink and sources uid are not provided in the application, can they still 
maintain state across job restarts from checkpoints ?  5. Can the sinks and 
sources without uid restart from savepoints ?6. The data streams have an 
attribute id -  How is this generated and can this be used for creating a uid 
for the sink ?  
Thanks for your help.
Mans  
  

  

Re: Apache Flink - Throttling stream flow

2019-11-25 Thread M Singh
 Thanks Ciazhi & Thomas for your responses.
I read the throttling example but want to see if that work with a distributed 
broker like Kinesis and how to have throttling feedback to the Kinesis source 
so that it can vary the rate without interfering with watermarks, etc.
Thanks again 
Mans

On Monday, November 25, 2019, 05:55:21 AM EST, Thomas Julian 
 wrote:  
 
 related

https://issues.apache.org/jira/browse/FLINK-13792

Regards,
Julian.


 On Mon, 25 Nov 2019 15:25:14 +0530 Caizhi Weng  
wrote 


Hi,

As far as I know, Flink currently doesn't have a built-in throttling function. 
You can write your own user-defined function to achieve this. Your function 
just gives out what it reads in and limits the speed it gives out records at 
the same time.

If you're not familiar with user-defined functions, see 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html

Here is a throttling iterator example: 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java

M Singh  于2019年11月25日周一 上午5:50写道:


Hi:

I have an Flink streaming application that invokes  some other web services.  
However the webservices have limited throughput.  So I wanted to find out if 
there any recommendations on how to throttle the Flink datastream so that they 
don't overload the downstrream services.  I am using Kinesis as source and sink 
in my application.

Please let me know if there any hooks available in Flink, what are the patterns 
that can be used and what are the best practices/pitfalls for using them.

Thanks 
Mans




  

Apache Flink - High Available K8 Setup Wrongly Marked Dead TaskManager

2019-11-25 Thread Eray Arslan
Hi, 

I have some trouble with my HA K8 cluster.
Current my Flink application has infinite stream. (With 12 parallelism)
After few days I am losing my task managers. And they never reconnect to job 
manager.
Because of this, application cannot get restored with restart policy.

I did few searches and I found “akka.watch” configurations. But they didn’t 
work.
I think this issue will solve the problem. Am I right? 
(https://issues.apache.org/jira/browse/FLINK-13883 
). Is there any workaround I 
can apply to solve this problem?

Thanks

Eray




Re: flink session cluster ha on k8s

2019-11-25 Thread Yun Tang
Currently, you still need zookeeper service to enable HA on k8s, and the 
configuration for this part is no different from YARN mode [1].
By the way, there also exists other solution to implement HA like etcd [2], but 
still in discussion.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html#config-file-flink-confyaml
[2] https://issues.apache.org/jira/browse/FLINK-11105

Best
Yun Tang

From: 曾祥才 
Date: Monday, November 25, 2019 at 9:28 AM
To: User-Flink 
Subject: flink session cluster ha on k8s

hi,   is there any example about  ha  on k8s  for  flink  session  cluster.?   
I've checked  the  docs on  flink.apache.org ,  seems no  info  about  this


Re: DataSet API: HBase ScannerTimeoutException and double Result processing

2019-11-25 Thread Mark Davis
Hi Flavio,

>> When the resultScanner dies because of a timeout (this happens a lot when 
>> you have backpressure and the time between 2 consecutive reads exceed the 
>> scanner timeout), the code creates a new scanner and restart from where it 
>> was (starRow = currentRow).
>> So there should not be any duplicates (in theory), but this could be the 
>> root of the problem..

Yes, you are right, the nextRecord() exception handling is responsible for the 
duplicate record processing:

org.apache.hadoop.hbase.client.ScannerTimeoutException: 1038878ms passed since 
the last invocation, timeout is currently set to 6
at 
org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:453)
at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:371)
at 
org.apache.flink.addons.hbase.AbstractTableInputFormat.nextRecord(AbstractTableInputFormat.java:130)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hbase.UnknownScannerException: 
org.apache.hadoop.hbase.UnknownScannerException: Name: 135281, already closed?
at 
org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2389)
at 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)

But I am not sure that the handling of the HBase exception thrown from 
ClientScanner.next() is correct.
If the call to mapResultToOutType(Result) finished without an error there is no 
need to restart from the same row.
The new scanner should start from the next row.
Is that so or am I missing something?

Best regards,
  Mark

Re: Apache Flink - Throttling stream flow

2019-11-25 Thread Thomas Julian
related



https://issues.apache.org/jira/browse/FLINK-13792



Regards,

Julian.







 On Mon, 25 Nov 2019 15:25:14 +0530 Caizhi Weng  
wrote 



Hi,



As far as I know, Flink currently doesn't have a built-in throttling function. 
You can write your own user-defined function to achieve this. Your function 
just gives out what it reads in and limits the speed it gives out records at 
the same time.



If you're not familiar with user-defined functions, see 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html



Here is a throttling iterator example: 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java




M Singh  于2019年11月25日周一 上午5:50写道:





Hi:



I have an Flink streaming application that invokes  some other web services.  
However the webservices have limited throughput.  So I wanted to find out if 
there any recommendations on how to throttle the Flink datastream so that they 
don't overload the downstrream services.  I am using Kinesis as source and sink 
in my application.



Please let me know if there any hooks available in Flink, what are the patterns 
that can be used and what are the best practices/pitfalls for using them.


Thanks 


Mans

Re: Flink Kudu Connector

2019-11-25 Thread vino yang
Hi Rahul,

Only found some resources from the Internet you can consider.[1][2]

Best,
Vino

[1]: https://bahir.apache.org/docs/flink/current/flink-streaming-kudu/
[2]:
https://www.slideshare.net/0xnacho/apache-flink-kudu-a-connector-to-develop-kappa-architectures

Rahul Jain  于2019年11月25日周一 下午6:32写道:

> Hi,
>
> We are trying to use the Flink Kudu connector. Is there any documentation
> available that we can read to understand how to use it ?
>
> We found some sample code but that was not very helpful.
>
> Thanks,
> -rahul
>


Flink Kudu Connector

2019-11-25 Thread Rahul Jain
Hi,

We are trying to use the Flink Kudu connector. Is there any documentation
available that we can read to understand how to use it ?

We found some sample code but that was not very helpful.

Thanks,
-rahul


Re: Apache Flink - Throttling stream flow

2019-11-25 Thread Caizhi Weng
Hi,

As far as I know, Flink currently doesn't have a built-in throttling
function. You can write your own user-defined function to achieve this.
Your function just gives out what it reads in and limits the speed it gives
out records at the same time.

If you're not familiar with user-defined functions, see
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html

Here is a throttling iterator example:
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java

M Singh  于2019年11月25日周一 上午5:50写道:

> Hi:
>
> I have an Flink streaming application that invokes  some other web
> services.  However the webservices have limited throughput.  So I wanted to
> find out if there any recommendations on how to throttle the Flink
> datastream so that they don't overload the downstrream services.  I am
> using Kinesis as source and sink in my application.
>
> Please let me know if there any hooks available in Flink, what are the
> patterns that can be used and what are the best practices/pitfalls for
> using them.
>
> Thanks
>
> Mans
>


Re: Idiomatic way to split pipeline

2019-11-25 Thread Avi Levi
Thanks, I'll check it out.

On Mon, Nov 25, 2019 at 11:46 AM vino yang  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> The side output provides a superset of split's functionality. So anything
> can be implemented via split also can be implemented via side output.[1]
>
> Best,
> Vino
>
> [1]:
> https://stackoverflow.com/questions/51440677/apache-flink-whats-the-difference-between-side-outputs-and-split-in-the-data
> 
>
> Avi Levi  于2019年11月25日周一 下午5:32写道:
>
>> Thank you, for your quick reply. I appreciate that.  but this it not
>> exactly "side output" per se. it is simple splitting. IIUC The side output
>> is more for splitting the records buy something the differentiate them
>> (latnes , value etc' ) . I thought there is more idiomatic but if this is
>> it, than I will go with that.
>>
>> On Mon, Nov 25, 2019 at 10:42 AM vino yang  wrote:
>>
>>> *This Message originated outside your organization.*
>>> --
>>> Hi Avi,
>>>
>>> As the doc of DataStream#split said, you can use the "side output"
>>> feature to replace it.[1]
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>> 
>>>
>>> Best,
>>> Vino
>>>
>>> Avi Levi  于2019年11月25日周一 下午4:12写道:
>>>
 Hi,
 I want to split the output of one of the operators to two pipelines.
 Since the *split* method is deprecated, what is the idiomatic way to
 do that without duplicating the operator ?

 [image: Screen Shot 2019-11-25 at 10.05.38.png]





Re: Idiomatic way to split pipeline

2019-11-25 Thread vino yang
Hi Avi,

The side output provides a superset of split's functionality. So anything
can be implemented via split also can be implemented via side output.[1]

Best,
Vino

[1]:
https://stackoverflow.com/questions/51440677/apache-flink-whats-the-difference-between-side-outputs-and-split-in-the-data

Avi Levi  于2019年11月25日周一 下午5:32写道:

> Thank you, for your quick reply. I appreciate that.  but this it not
> exactly "side output" per se. it is simple splitting. IIUC The side output
> is more for splitting the records buy something the differentiate them
> (latnes , value etc' ) . I thought there is more idiomatic but if this is
> it, than I will go with that.
>
> On Mon, Nov 25, 2019 at 10:42 AM vino yang  wrote:
>
>> *This Message originated outside your organization.*
>> --
>> Hi Avi,
>>
>> As the doc of DataStream#split said, you can use the "side output"
>> feature to replace it.[1]
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>
>> Best,
>> Vino
>>
>> Avi Levi  于2019年11月25日周一 下午4:12写道:
>>
>>> Hi,
>>> I want to split the output of one of the operators to two pipelines.
>>> Since the *split* method is deprecated, what is the idiomatic way to do
>>> that without duplicating the operator ?
>>>
>>> [image: Screen Shot 2019-11-25 at 10.05.38.png]
>>>
>>>
>>>


Re: Metrics for Task States

2019-11-25 Thread Caizhi Weng
Hi Kelly,

As far as I know Flink currently does not have such metrics to monitor on
the number of tasks in each states. See
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
for
the complete metrics list. (It seems that `taskSlotsAvailable` in the
metrics list is the most related metrics).

But Flink has a REST api which can provide states for all the tasks
(http://hostname:port/overview). This REST returns a json string containing
all the metrics you want. Maybe you can write your own tool to monitor on
this api.

If you really want to have metrics that describe the number of tasks in
each states, you can open up a JIRA ticket at
https://issues.apache.org/jira/projects/FLINK/issues/

Thank you

Kelly Smith  于2019年11月25日周一 上午12:59写道:

> With EMR/YARN, the cluster is definitely running in session mode. It
> exists independently of any job and continues running after the job exits.
>
> Whether or not this is a bug in Flink, is it possible to get access to the
> metrics I'm asking about? Those would be useful even if this behavior is
> fixed.
>
> Get Outlook for Android 
>
> --
> *From:* Piper Piper 
> *Sent:* Friday, November 22, 2019 9:10:41 PM
> *To:* Kelly Smith 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Metrics for Task States
>
> I am trying to reason why this problem should occur (i.e. why Flink could
> not reject the job when it required more slots than were available).
>
> Flink in production on EMR (YARN): Does this mean Flink was being run in
> Job mode or Session mode?
>
> Thank you,
>
> Piper
>
> On Thu, Nov 21, 2019 at 4:56 PM Piper Piper  wrote:
>
> Thank you, Kelly!
>
> On Thu, Nov 21, 2019 at 4:06 PM Kelly Smith 
> wrote:
>
> Hi Piper,
>
>
>
> The repro is pretty simple:
>
>- Submit a job with parallelism set higher than YARN has resources to
>support
>
>
>
> What this ends up looking like in the Flink UI is this:
>
>
>
> The Job is in a “RUNNING” state, but all of the tasks are in the
> “SCHEDULED” state. The `jobmanager.numRunningJobs` metric that Flink emits
> by default will increase by 1, but none of the tasks actually get scheduled
> on any TM.
>
>
>
>
>
> What I’m looking for is a way to detect when I am in this state using
> Flink metrics (ideally the count of tasks in each state for better
> observability).
>
>
>
> Does that make sense?
>
>
>
> Thanks,
>
> Kelly
>
>
>
> *From: *Piper Piper 
> *Date: *Thursday, November 21, 2019 at 12:59 PM
> *To: *Kelly Smith 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Metrics for Task States
>
>
>
> Hello Kelly,
>
>
>
> I thought that Flink scheduler only starts a job if all requested
> containers/TMs are available and allotted to that job.
>
>
>
> How can I reproduce your issue on Flink with YARN?
>
>
>
> Thank you,
>
>
>
> Piper
>
>
>
>
>
> On Thu, Nov 21, 2019, 1:48 PM Kelly Smith  wrote:
>
> I’ve been running Flink in production on EMR (YARN) for some time and have
> found the metrics system to be quite useful, but there is one specific case
> where I’m missing a signal for this scenario:
>
>
>
>- When a job has been submitted, but YARN does not have enough
>resources to provide
>
>
>
> Observed:
>
>- Job is in RUNNING state
>- All of the tasks for the job are in the (I believe) DEPLOYING state
>
>
>
> Is there a way to access these as metrics for monitoring the number of
> tasks in each state for a given job (image below)? The metric I’m currently
> using is the number of running jobs, but it misses this “unhealthy”
> scenario. I realize that I could use application-level metrics (record
> counts, etc) as a proxy for this, but I’m working on providing a streaming
> platform and need all of my monitoring to be application agnostic.
>
> [image: cid:image001.png@01D5A059.19DB3EB0]
>
>
>
> I can’t find anything on it in the documentation.
>
>
>
> Thanks,
>
> Kelly
>
>


Re: DataSet API: HBase ScannerTimeoutException and double Result processing

2019-11-25 Thread Flavio Pompermaier
Maybe the problem is indeed this..the fact that the scan starts from the
last seen row..in this case maybe the first result should be skipped
because it was already read..

On Mon, Nov 25, 2019 at 10:22 AM Flavio Pompermaier 
wrote:

> What I can tell is how the HBase input format works..if you look
> at AbstractTableInputFormat [1] this is the nextRecord() function:
>
> public T nextRecord(T reuse) throws IOException {
>   if (resultScanner == null) {
>   throw new IOException("No table result scanner 
> provided!");
>   }
>   try {
>   Result res = resultScanner.next();
>   if (res != null) {
>   scannedRows++;
>   currentRow = res.getRow();
>   return mapResultToOutType(res);
>   }
>   } catch (Exception e) {
>   resultScanner.close();
>   //workaround for timeout on scan
>   LOG.warn("Error after scan of " + scannedRows + " rows. 
> Retry with a new scanner...", e);
>   scan.setStartRow(currentRow);
>   resultScanner = table.getScanner(scan);
>   Result res = resultScanner.next();
>   if (res != null) {
>   scannedRows++;
>   currentRow = res.getRow();
>   return mapResultToOutType(res);
>   }
>   }
>
>   endReached = true;
>   return null;
>   }
>
> When the resultScanner dies because of a timeout (this happens a lot when
> you have backpressure and the time between 2 consecutive reads exceed the
> scanner timeout), the code creates a new scanner and restart from where it
> was (starRow = currentRow).
> So there should not be any duplicates (in theory), but this could be the
> root of the problem..
>
> Best,
> Flavio
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
>
> On Sat, Nov 23, 2019 at 11:07 PM Mark Davis 
> wrote:
>
>> Hello,
>>
>> I am reading Results from an HBase table and process them with Batch API.
>> Everything works fine until I receive a ScannerTimeoutException from HBase.
>> Maybe my transformations get stuck or a GC pause happen - hard to tell.
>> The HBase Client restarts the scan and the processing continues.
>> Except one problem - every time I receive this Exception I observe a
>> duplicate Result processing - the Result which was processed just before
>> ScannerTimeoutException is thrown is processed twice.
>>
>> Is this expected behavior? Should I be prepared to handle it?
>> And how should I handle it? Keeping track of all processed Results is not
>> feasible in my case.
>>
>> Here is a simple job demonstrating an issue (HBase scan and RPC timeouts
>> are set to 60 sec)
>>
>> Thank you!
>>
>> Best regards,
>> Mark
>>
>>
>>   public static void main(String[] args) throws Exception {
>> ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> env.setParallelism(1);
>>
>> env.createInput(new Src())
>> .map(new Mapper())
>> .print();
>>   }
>>
>>   private static class Mapper implements MapFunction,
>> String> {
>>
>> private int cnt = 0;
>>
>> @Override
>> public String map(Tuple1 value) throws Exception {
>>   if (cnt++ % 2 == 0) {
>> Thread.sleep(12);
>>   }
>>   return value.f0;
>> }
>>
>>   }
>>
>>   private static class Src extends
>> AbstractTableInputFormat> {
>>
>> @Override
>> protected Scan getScanner() {
>>   Scan scan = new Scan();
>>   scan.setStartRow(getStartRow());
>>   scan.setStopRow(getEndRow());
>>   scan.setCaching(1);
>>   scan.setCacheBlocks(false);
>>   return scan;
>> }
>>
>> @Override
>> protected String getTableName() {
>>   return getTable();
>> }
>>
>> @Override
>> protected Tuple1 mapResultToOutType(Result r) {
>>   return new Tuple1(Bytes.toString(r.getRow()));
>> }
>>
>> @Override
>> public void configure(org.apache.flink.configuration.Configuration
>> parameters) {
>>   scan = getScanner();
>>   try {
>> table = new HTable(getHadoopConf(), getTableName());
>>   } catch (IOException e) {
>> e.printStackTrace();
>>   }
>> }
>>
>>   }
>>
>
>


Re: Idiomatic way to split pipeline

2019-11-25 Thread Avi Levi
Thank you, for your quick reply. I appreciate that.  but this it not
exactly "side output" per se. it is simple splitting. IIUC The side output
is more for splitting the records buy something the differentiate them
(latnes , value etc' ) . I thought there is more idiomatic but if this is
it, than I will go with that.

On Mon, Nov 25, 2019 at 10:42 AM vino yang  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> As the doc of DataStream#split said, you can use the "side output" feature
> to replace it.[1]
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
> 
>
> Best,
> Vino
>
> Avi Levi  于2019年11月25日周一 下午4:12写道:
>
>> Hi,
>> I want to split the output of one of the operators to two pipelines.
>> Since the *split* method is deprecated, what is the idiomatic way to do
>> that without duplicating the operator ?
>>
>> [image: Screen Shot 2019-11-25 at 10.05.38.png]
>>
>>
>>


Re: DataSet API: HBase ScannerTimeoutException and double Result processing

2019-11-25 Thread Flavio Pompermaier
What I can tell is how the HBase input format works..if you look
at AbstractTableInputFormat [1] this is the nextRecord() function:

public T nextRecord(T reuse) throws IOException {
if (resultScanner == null) {
throw new IOException("No table result scanner 
provided!");
}
try {
Result res = resultScanner.next();
if (res != null) {
scannedRows++;
currentRow = res.getRow();
return mapResultToOutType(res);
}
} catch (Exception e) {
resultScanner.close();
//workaround for timeout on scan
LOG.warn("Error after scan of " + scannedRows + " rows. 
Retry with
a new scanner...", e);
scan.setStartRow(currentRow);
resultScanner = table.getScanner(scan);
Result res = resultScanner.next();
if (res != null) {
scannedRows++;
currentRow = res.getRow();
return mapResultToOutType(res);
}
}

endReached = true;
return null;
}

When the resultScanner dies because of a timeout (this happens a lot when
you have backpressure and the time between 2 consecutive reads exceed the
scanner timeout), the code creates a new scanner and restart from where it
was (starRow = currentRow).
So there should not be any duplicates (in theory), but this could be the
root of the problem..

Best,
Flavio

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java

On Sat, Nov 23, 2019 at 11:07 PM Mark Davis  wrote:

> Hello,
>
> I am reading Results from an HBase table and process them with Batch API.
> Everything works fine until I receive a ScannerTimeoutException from HBase.
> Maybe my transformations get stuck or a GC pause happen - hard to tell.
> The HBase Client restarts the scan and the processing continues.
> Except one problem - every time I receive this Exception I observe a
> duplicate Result processing - the Result which was processed just before
> ScannerTimeoutException is thrown is processed twice.
>
> Is this expected behavior? Should I be prepared to handle it?
> And how should I handle it? Keeping track of all processed Results is not
> feasible in my case.
>
> Here is a simple job demonstrating an issue (HBase scan and RPC timeouts
> are set to 60 sec)
>
> Thank you!
>
> Best regards,
> Mark
>
>
>   public static void main(String[] args) throws Exception {
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> env.createInput(new Src())
> .map(new Mapper())
> .print();
>   }
>
>   private static class Mapper implements MapFunction,
> String> {
>
> private int cnt = 0;
>
> @Override
> public String map(Tuple1 value) throws Exception {
>   if (cnt++ % 2 == 0) {
> Thread.sleep(12);
>   }
>   return value.f0;
> }
>
>   }
>
>   private static class Src extends
> AbstractTableInputFormat> {
>
> @Override
> protected Scan getScanner() {
>   Scan scan = new Scan();
>   scan.setStartRow(getStartRow());
>   scan.setStopRow(getEndRow());
>   scan.setCaching(1);
>   scan.setCacheBlocks(false);
>   return scan;
> }
>
> @Override
> protected String getTableName() {
>   return getTable();
> }
>
> @Override
> protected Tuple1 mapResultToOutType(Result r) {
>   return new Tuple1(Bytes.toString(r.getRow()));
> }
>
> @Override
> public void configure(org.apache.flink.configuration.Configuration
> parameters) {
>   scan = getScanner();
>   try {
> table = new HTable(getHadoopConf(), getTableName());
>   } catch (IOException e) {
> e.printStackTrace();
>   }
> }
>
>   }
>


Re: Idiomatic way to split pipeline

2019-11-25 Thread vino yang
Hi Avi,

As the doc of DataStream#split said, you can use the "side output" feature
to replace it.[1]

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

Best,
Vino

Avi Levi  于2019年11月25日周一 下午4:12写道:

> Hi,
> I want to split the output of one of the operators to two pipelines. Since
> the *split* method is deprecated, what is the idiomatic way to do that
> without duplicating the operator ?
>
> [image: Screen Shot 2019-11-25 at 10.05.38.png]
>
>
>


Re: How to submit two jobs sequentially and view their outputs in .out file?

2019-11-25 Thread vino yang
Hi Komal,

> Thank you! That's exactly what's happening. Is there any way to force it
write to a specific .out of a TaskManager?

No, I am curious why the two jobs depend on stdout? Can we introduce
another coordinator other than stdout? IMO, this mechanism is not always
available.

Best,
Vino

Komal Mariam  于2019年11月25日周一 上午10:46写道:

> Hi Theo,
>
> I want to interrupt/cancel my current job as it has produced the desired
> results even though it runs infinitely,  and the next one requires full
> resources.
>
> Due to some technical issue we cannot access the web UI so just working
> with the CLI, for now.
>
> I found a less crude way by running the command ./bin/flink cancel  id>  specified by the commands listed here:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
>
> 
>
> Hello Vino,
>
> Thank you! That's exactly what's happening. Is there any way to force it
> write to a specific .out of a TaskManager?
>
>
> Best Regards,
> Komal
>
>
>
> On Mon, 25 Nov 2019 at 11:10, vino yang  wrote:
>
>> Hi Komal,
>>
>> Since you use the Flink standalone deployment mode, the tasks of the jobs
>> which print information to the STDOUT may randomly deploy in any task
>> manager of the cluster. Did you check other Task Managers out file?
>>
>> Best,
>> Vino
>>
>> Komal Mariam  于2019年11月22日周五 下午6:59写道:
>>
>>> Dear all,
>>>
>>> Thank you for your help regarding my previous queries. Unfortunately,
>>> I'm stuck with another one and will really appreciate your input.
>>>
>>> I can't seem to produce any outputs in "flink-taskexecutor-0.out" from
>>> my second job after submitting the first one in my 3-node-flink standalone
>>> cluster.
>>>
>>> Say I want to test out two jobs sequentially. (I do not want to run them
>>> concurrently/in parallel).
>>>
>>> After submitting "job1.jar " via command line, I press "Ctrl + C" to
>>> exit from it (as it runs infinitely). After that I
>>> try to submit a second jar file having the same properties (group-id,
>>> topic, etc) with the only difference being the query written in main
>>> function.
>>>
>>> The first job produces relevant outputs in "flink-taskexecutor-0.out"
>>> but the second one doesn't.
>>>
>>> The only way I can see the output produced is if I restart the cluster
>>> after job1 and then submit job2 as it produces another .out file.
>>>
>>> But I want to submit 2 jobs sequentially and see their outputs without
>>> having to restart my cluster. Is there any way to do this?
>>>
>>> Additional info:
>>> For both jobs I'm using DataStream API and I have set:
>>>  StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> Best Regards,
>>> Komal
>>>
>>


Idiomatic way to split pipeline

2019-11-25 Thread Avi Levi
Hi,
I want to split the output of one of the operators to two pipelines. Since
the *split* method is deprecated, what is the idiomatic way to do that
without duplicating the operator ?

[image: Screen Shot 2019-11-25 at 10.05.38.png]