Re: Are there pipeline API's for ETL?

2020-01-10 Thread kant kodali
Hi Vino,

Another use case would be I want to build a dag of batch sources, sinks and 
transforms and I want to schedule the jobs periodically. One can say similar to 
airflow but a Flink api would be lot better!

Sent from my iPhone

> On Jan 10, 2020, at 6:42 PM, vino yang  wrote:
> 
> 
> Hi kant,
> 
> Can you provide more context about your question? What do you mean about 
> "pipeline API"?
> 
> IMO, you can build an ETL pipeline via composing several Flink transform 
> APIs. About choosing which transform APIs, it depends on your business logic. 
> 
> Here are the generic APIs list.[1]
> 
> Best,
> Vino
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/index.html
> 
> kant kodali  于2020年1月11日周六 上午9:06写道:
>> Hi All,
>> 
>> I am wondering if there are pipeline API's for ETL?
>> 
>> Thanks!
>> 
>> 


Re: Are there pipeline API's for ETL?

2020-01-10 Thread kant kodali
Hi Vino,

I am new to Flink. I was thinking more like a dag builder api where I can build 
a dag of source,sink and transforms and hopefully fink take cares of the entire 
life cycle of the dag.

An example would be CDAP pipeline api.

Sent from my iPhone

> On Jan 10, 2020, at 6:42 PM, vino yang  wrote:
> 
> 
> Hi kant,
> 
> Can you provide more context about your question? What do you mean about 
> "pipeline API"?
> 
> IMO, you can build an ETL pipeline via composing several Flink transform 
> APIs. About choosing which transform APIs, it depends on your business logic. 
> 
> Here are the generic APIs list.[1]
> 
> Best,
> Vino
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/index.html
> 
> kant kodali  于2020年1月11日周六 上午9:06写道:
>> Hi All,
>> 
>> I am wondering if there are pipeline API's for ETL?
>> 
>> Thanks!
>> 
>> 


Re: Are there pipeline API's for ETL?

2020-01-10 Thread vino yang
Hi kant,

Can you provide more context about your question? What do you mean about
"pipeline API"?

IMO, you can build an ETL pipeline via composing several Flink transform
APIs. About choosing which transform APIs, it depends on your business
logic.

Here are the generic APIs list.[1]

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/index.html

kant kodali  于2020年1月11日周六 上午9:06写道:

> Hi All,
>
> I am wondering if there are pipeline API's for ETL?
>
> Thanks!
>
>
>


Are there pipeline API's for ETL?

2020-01-10 Thread kant kodali
Hi All,

I am wondering if there are pipeline API's for ETL?

Thanks!


Re: Re: flink savepoint checkpoint

2020-01-10 Thread Px New
Hello ,针对于你的问题 我发现一件有趣的事情
在我以 Yarn per-Job 方式 启动Job程序后  在yarn 的资源管理界面 可以看到我启动的任务 ->
它有属于自己的application-Id  然后当我 通过Yarn 的Tracking Ui 下的 Application
Master
点击进入到Job的Web Ui 界面后(flink的web ui)通过在此界面点击canal 这个按钮 kill 掉程序后 在Yarn 的 管理界面
发现还是有个空壳子的。  当我通过在终端输入 yarn application -kill Id  后 这个程序才会被杀死。 所以我初步认为
他是Stop 程序。

祝好!
Px

amen...@163.com  于2020年1月10日周五 下午5:59写道:

> hi,了解到使用stop进行任务停止并触发savepoint,会在停止之前生成max_watermark,并注册event-time计时器,我想请问使用yarn
> kill方式直接停止任务,会属于cancel还是stop亦或是其他?
>
>
>
> amen...@163.com
>
> From: Congxian Qiu
> Date: 2020-01-10 17:16
> To: user-zh
> Subject: Re: flink savepoint checkpoint
> Hi
> 从 Flink 的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job
> 之间的状态复用。
> 另外,从 1.9 开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做
> StopWithCheckpoint[2]
>
> [1] https://issues.apache.org/jira/browse/FLINK-11458
> [2] https://issues.apache.org/jira/browse/FLINK-12619
> Best,
> Congxian
>
>
> zhisheng  于2020年1月10日周五 上午11:39写道:
>
> > hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint
> > 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。
> >
> > 祝好!
> > zhisheng
> >
> > Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道:
> >
> > > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
> > > -->
> > >
> > >
> > >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > >
> > >
> > > lucas.wu  于2019年12月11日周三 上午11:56写道:
> > >
> > > > hi 各位:
> > > >
> > > >
> > >
> >
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
> > >
> >
>


Re: How can I find out which key group belongs to which subtask

2020-01-10 Thread 杨东晓
Thanks Till , I will do some test about this , will this be some public
feature in next release version or later?

Till Rohrmann  于2020年1月10日周五 上午6:15写道:

> Hi,
>
> you would need to set the co-location constraint in order to ensure that
> the sub-tasks of operators are deployed to the same machine. It effectively
> means that subtasks a_i, b_i  of operator a and b will be deployed to the
> same slot. This feature is not super well exposed but you can take a look
> at [1] to see how it can be used.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9809
>
> Cheers,
> Till
>
> On Fri, Jan 10, 2020 at 9:08 AM Zhijiang 
> wrote:
>
>> Only chained operators can avoid record serialization cost, but the
>> chaining mode can not support keyed stream.
>> If you want to deploy downstream with upstream in the same task manager,
>> it can avoid network shuffle cost which can still get performance benefits.
>> As I know @Till Rohrmann has implemented some enhancements in scheduler
>> layer to support such requirement in release-1.10. You can have a try when
>> the rc candidate is ready.
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:杨东晓 
>> Send Time:2020 Jan. 10 (Fri.) 02:10
>> To:Congxian Qiu 
>> Cc:user 
>> Subject:Re: How can I find out which key group belongs to which subtask
>>
>> Thanks Congxian!
>>  My purpose is not only make data goes into one same subtask but the
>> specific subtask which belongs to same taskmanager with upstream record.
>> The key idea is to avoid shuffling  between taskmanagers.
>> I think the KeyGroupRangeAssignment.java
>> 
>> explained a lot about how to get keygroup and subtask context that can make
>> that happen.
>> Do you know if there are still  serialization happening while data
>> transferred between operator in same taskmanager?
>> Thanks.
>>
>> Congxian Qiu  于2020年1月9日周四 上午1:55写道:
>> Hi
>>
>> If you just want to make sure some key goes into the same subtask, does
>> custom key selector[1] help?
>>
>> For the keygroup and subtask information, you can ref to
>> KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you
>> can ref to doc[3]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions
>> [2]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism
>>
>> Best,
>> Congxian
>>
>>
>> 杨东晓  于2020年1月9日周四 上午7:47写道:
>> Hi , I'm trying to do some optimize about Flink 'keyby' processfunction.
>> Is there any possible I can find out one key belongs to which key-group
>> and essentially find out one key-group belongs to which subtask.
>> The motivation I want to know that is we want to  force the data records
>> from upstream still goes to same taskmanager downstream subtask .Which
>> means even if we use a keyedstream function we still want no cross jvm
>> communication happened during run time.
>> And if we can achieve that , can we also avoid the expensive cost for
>> record serialization because data is only transferred in same taskmanager
>> jvm instance?
>>
>> Thanks.
>>
>>
>>


Re: How can I find out which key group belongs to which subtask

2020-01-10 Thread 杨东晓
Thanks Zhijiang, looks like serialization will always be there in keyed
stream

Zhijiang  于2020年1月10日周五 上午12:08写道:

> Only chained operators can avoid record serialization cost, but the
> chaining mode can not support keyed stream.
> If you want to deploy downstream with upstream in the same task manager,
> it can avoid network shuffle cost which can still get performance benefits.
> As I know @Till Rohrmann has implemented some enhancements in scheduler
> layer to support such requirement in release-1.10. You can have a try when
> the rc candidate is ready.
>
> Best,
> Zhijiang
>
> --
> From:杨东晓 
> Send Time:2020 Jan. 10 (Fri.) 02:10
> To:Congxian Qiu 
> Cc:user 
> Subject:Re: How can I find out which key group belongs to which subtask
>
> Thanks Congxian!
>  My purpose is not only make data goes into one same subtask but the
> specific subtask which belongs to same taskmanager with upstream record.
> The key idea is to avoid shuffling  between taskmanagers.
> I think the KeyGroupRangeAssignment.java
> 
> explained a lot about how to get keygroup and subtask context that can make
> that happen.
> Do you know if there are still  serialization happening while data
> transferred between operator in same taskmanager?
> Thanks.
>
> Congxian Qiu  于2020年1月9日周四 上午1:55写道:
> Hi
>
> If you just want to make sure some key goes into the same subtask, does
> custom key selector[1] help?
>
> For the keygroup and subtask information, you can ref to
> KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you
> can ref to doc[3]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions
> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism
>
> Best,
> Congxian
>
>
> 杨东晓  于2020年1月9日周四 上午7:47写道:
> Hi , I'm trying to do some optimize about Flink 'keyby' processfunction.
> Is there any possible I can find out one key belongs to which key-group
> and essentially find out one key-group belongs to which subtask.
> The motivation I want to know that is we want to  force the data records
> from upstream still goes to same taskmanager downstream subtask .Which
> means even if we use a keyedstream function we still want no cross jvm
> communication happened during run time.
> And if we can achieve that , can we also avoid the expensive cost for
> record serialization because data is only transferred in same taskmanager
> jvm instance?
>
> Thanks.
>
>
>


Re: Please suggest helpful tools

2020-01-10 Thread Eva Eva
Thank you both for the suggestions.
I did a bit more analysis using UI and identified at least one
problem that's occurring with the job rn. Going to fix it first and then
take it from there.

*Problem that I identified:*
I'm running with 26 parallelism. For the checkpoints that are expiring, one
of a JOIN operation is finishing at 25/26 (96%) progress with corresponding
SubTask:21 has "n/a" value. For the same operation I also noticed that the
load is distributed poorly with heavy load being fed to SubTask:21.
My guess is bunch of null values are happening for this JOIN operation and
being put into the same task.
Currently I'm using SQL query which gives me limited control on handling
null values so I'll try to programmatically JOIN and see if I can avoid
JOIN operation whenever the joining value is null. This should help with
better load distribution across subtasks. And may also fix expiring
checkpointing issue.

Thanks for the guidance.
Eva.

On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu  wrote:

> Hi
>
> For expired checkpoint, you can find something like " Checkpoint xxx of
> job xx expired before completing" in jobmanager.log, then you can go to the
> checkpoint UI to find which tasks did not ack, and go to these tasks to see
> what happened.
>
> If checkpoint was been declined, you can find something like "Decline
> checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this
> case, you can go to the task directly to find out why the checkpoint failed.
>
> Best,
> Congxian
>
>
> Yun Tang  于2020年1月10日周五 下午7:31写道:
>
>> Hi Eva
>>
>> If checkpoint failed, please view the web UI or jobmanager log to see why
>> checkpoint failed, might be declined by some specific task.
>>
>> If checkpoint expired, you can also access the web UI to see which tasks
>> did not respond in time, some hot task might not be able to respond in
>> time. Generally speaking, checkpoint expired is mostly caused by back
>> pressure which led the checkpoint barrier did not arrive in time. Resolve
>> the back pressure could help the checkpoint finished before timeout.
>>
>> I think the doc of monitoring web UI for checkpoint [1] and back pressure
>> [2] could help you.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>
>> Best
>> Yun Tang
>> --
>> *From:* Eva Eva 
>> *Sent:* Friday, January 10, 2020 10:29
>> *To:* user 
>> *Subject:* Please suggest helpful tools
>>
>> Hi,
>>
>> I'm running Flink job on 1.9 version with blink planner.
>>
>> My checkpoints are timing out intermittently, but as state grows they are
>> timing out more and more often eventually killing the job.
>>
>> Size of the state is large with Minimum=10.2MB and Maximum=49GB (this one
>> is accumulated due to prior failed ones), Average=8.44GB.
>>
>> Although size is huge, I have enough space on EC2 instance in which I'm
>> running job. I'm using RocksDB for checkpointing.
>>
>> *Logs does not have any useful information to understand why checkpoints
>> are expiring/failing, can someone please point me to tools that can be used
>> to investigate and understand why checkpoints are failing.*
>>
>> Also any other related suggestions are welcome.
>>
>>
>> Thanks,
>> Reva.
>>
>


Re: Yarn Kerberos issue

2020-01-10 Thread Juan Gentile
The error we get is the following:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy 
Yarn session cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: java.lang.RuntimeException: Hadoop security with Kerberos is enabled 
but the login user does not have Kerberos credentials
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:490)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)
... 9 more

On 1/10/20, 3:36 PM, "Aljoscha Krettek"  wrote:

Hi,

Interesting! What problem are you seeing when you don't unset that 
environment variable? From reading UserGroupInformation.java our code 
should almost work when that environment variable is set.

Best,
Aljoscha

On 10.01.20 15:23, Juan Gentile wrote:
> Hello Aljoscha!
> 
> 
> 
> The way we send the DTs to spark is by setting an env variable 
(HADOOP_TOKEN_FILE_LOCATION) which interestingly we have to unset when we run 
Flink because even if we do kinit that variable affects somehow Flink and 
doesn’t work.
> 
> I’m not an expert but what you describe (We would need to modify the 
SecurityUtils [2] to create a UGI from tokens, i.e. 
UserGroupInformation.createRemoteUser() and then addCreds() or addToken()) 
makes sense.
> 
> If you are able to do this it would be great and help us a lot!
> 
> 
> 
> Thank you,
> 
> Juan
> 
> 
> 
> On 1/10/20, 3:13 PM, "Aljoscha Krettek"  wrote:
> 
> 
> 
>  Hi,
> 
> 
> 
>  it seems I hin send to early, my mail was missing a small part. This 
is
> 
>  the full mail again:
> 
> 
> 
>  to summarize and clarify various emails: currently, you can only use
> 
>  Kerberos authentication via tickets (i.e. kinit) or keytab. The 
relevant
> 
>  bit of code is in the Hadoop security module [1]. Here you can see 
that
> 
>  we either use keytab or try to login as a user.
> 
> 
> 
>  I think we should be able to extend this to also work with delegation
> 
>  tokens (DTs). We would need to modify the SecurityUtils [2] to 
create a
> 
>  UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and 
then
> 
>  addCreds() or addToken(). In Spark, how do you pass the DTs to the
> 
>  system as a user?
> 
> 
> 
>  Best,
> 
>  Aljoscha
> 
> 
> 
>  [1]
> 
>  
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087617028sdata=3FKN2UWzWr1K14klFN1Su2hKvhi8%2BADj4GCwdTDnKQU%3Dreserved=0
> 
>  [2]
> 
>  
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2FSecurityUtils.java%23L89data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978sdata=to%2FsDgs1JlTASVg76OmxRD4dVPuSuwZC5jBpPms8hFY%3Dreserved=0
> 
> 
> 
>  On 10.01.20 15:02, Aljoscha Krettek wrote:
> 
>  > Hi Juan,
> 
>  >
> 
>  > to summarize and clarify various emails: currently, you can only 
use
> 
>  > Kerberos authentication via tickets (i.e. kinit) or keytab. The 
relevant
> 
>  > bit of code is in the Hadoop security module: [1]. Here you can 
see that
> 
>  > we either use keytab.
> 
>  >
> 
>  > I think we should be 

Re: StreamingFileSink doesn't close multipart uploads to s3?

2020-01-10 Thread Ken Krugler
Hi Kostas,

I didn’t see a follow-up to this, and have also run into this same issue of 
winding up with a bunch of .inprogress files when a bounded input stream ends 
and the job terminates.

When StreamingFileSystem.close() is called, shouldn’t all buckets get 
auto-rolled, so that the .inprogress files become part-xxx files?

Thanks,

— Ken


> On Dec 9, 2019, at 6:56 PM, Jingsong Li  wrote:
> 
> Hi Kostas,
> 
> I  took a look to StreamingFileSink.close, it just delete all temporary 
> files. I know it is for failover. When Job fail, it should just delete temp 
> files for next restart.
> But for testing purposes, we just want to run a bounded streaming job. If 
> there is no checkpoint trigger, no one will move the final temp files to 
> output path, so the result of this job is wrong.
> Do you have any idea about this? Can we distinguish "fail close" from 
> "success finish close" in StreamingFileSink?
> 
> Best,
> Jingsong Lee
> 
> On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas  > wrote:
> Hi Li,
> 
> This is the expected behavior. All the "exactly-once" sinks in Flink
> require checkpointing to be enabled.
> We will update the documentation to be clearer in the upcoming release.
> 
> Thanks a lot,
> Kostas
> 
> On Sat, Dec 7, 2019 at 3:47 AM Li Peng  > wrote:
> >
> > Ok I seem to have solved the issue by enabling checkpointing. Based on the 
> > docs (I'm using 1.9.0), it seemed like only 
> > StreamingFileSink.forBulkFormat() should've required checkpointing, but 
> > based on this experience, StreamingFileSink.forRowFormat() requires it too! 
> > Is this the intended behavior? If so, the docs should probably be updated.
> >
> > Thanks,
> > Li
> >
> > On Fri, Dec 6, 2019 at 2:01 PM Li Peng  > > wrote:
> >>
> >> Hey folks, I'm trying to get StreamingFileSink to write to s3 every 
> >> minute, with flink-s3-fs-hadoop, and based on the default rolling policy, 
> >> which is configured to "roll" every 60 seconds, I thought that would be 
> >> automatic (I interpreted rolling to mean actually close a multipart upload 
> >> to s3).
> >>
> >> But I'm not actually seeing files written to s3 at all, instead I see a 
> >> bunch of open multipart uploads when I check the AWS s3 console, for 
> >> example:
> >>
> >>  "Uploads": [
> >> {
> >> "Initiated": "2019-12-06T20:57:47.000Z",
> >> "Key": "2019-12-06--20/part-0-0"
> >> },
> >> {
> >> "Initiated": "2019-12-06T20:57:47.000Z",
> >> "Key": "2019-12-06--20/part-1-0"
> >> },
> >> {
> >> "Initiated": "2019-12-06T21:03:12.000Z",
> >> "Key": "2019-12-06--21/part-0-1"
> >> },
> >> {
> >> "Initiated": "2019-12-06T21:04:15.000Z",
> >> "Key": "2019-12-06--21/part-0-2"
> >> },
> >> {
> >> "Initiated": "2019-12-06T21:22:23.000Z"
> >> "Key": "2019-12-06--21/part-0-3"
> >> }
> >> ]
> >>
> >> And these uploads are being open for a long time. So far after an hour, 
> >> none of the uploads have been closed. Is this the expected behavior? If I 
> >> wanted to get these uploads to actually write to s3 quickly, do I need to 
> >> configure the hadoop stuff to get that done, like setting a smaller 
> >> buffer/partition size to force it to upload?
> >>
> >> Thanks,
> >> Li
> 
> 
> -- 
> Best, Jingsong Lee

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Apache Flink - Sharing state in processors

2020-01-10 Thread M Singh
Hi:
I have a few question about how state is shared in processors in Flink.
1. If I have a processor instantiated in the Flink app, and apply use in 
multiple times in the Flink -     (a) if the tasks are in the same slot - do 
they share the same processor on the taskmanager ?
    (b) if the tasks are on same node but different slots - do they share the 
same processor on the taskmanager ?

2. If I instantiate a single processor with local state and use it in multiple 
times in Flink     (a) if the tasks are in the same slot - do they share the 
same processor and state on the taskmanager ?
    (b) if the tasks are on same node but different slots - do they share the 
same processor and state on the taskmanager ?

3. If I instantiate a multiple processors with shared collection and use it in 
multiple times in Flink     (a) if the tasks are in the same slot - do they 
share the state on the taskmanager ?
    (b) if the tasks are on same node but different slots - do they share the 
state on the taskmanager ?
4. How do the above scenarios affect sharing (a) operator state(b) 
keyed state
5. If I have have a parallelism of > 1, and use keyBy - is each key handled by 
only one instance of the processor ?  I believe so, but wanted to confirm.

Thanks
Mans






Re: Running Flink on java 11

2020-01-10 Thread Chesnay Schepler
The error you got is due to an older asm version which is fixed for 1.10 
in https://issues.apache.org/jira/browse/FLINK-13467 .


On 10/01/2020 15:58, KristoffSC wrote:

Hi,
Yangze Guo, Chesnay Schepler thank you very much for your answers.

I have actually a funny setup.
So I have a Flink Job module, generated from Flink's maven archetype.
This module has all operators and Flink environment config and execution.
This module is compiled by maven with "maven.compiler.target" set to 1.8

However I'm using a 3rd party library that was compiled with java 11.
In order to build my main Job module I have to use JDK 11, however I still
have "maven.compiler.target" set to 1.8 there.

As a result, I have a Flink job jar, that has classes from Java 8 and 11.
Running javap -verbose proves it. All classes from Flink Job module are in
Java 8.

I can build Flink Job cluster image that is based on [1]. However i had to
change base image from openjdk:8-jre-alpine to
adoptopenjdk/openjdk11:jre-11.0.5_10-alpine plus remove installing
libc6-compat.

After rebuilding the docker image, Job cluster started and process messges.

On original openjdk:8-jre-alpine it was unable to start due issues with
loading classes from my 3rd party library (Unsupported major.minor version
exception)
So this seems to work.


However if I would change "maven.compiler.target" to Java 11 in my Flink Job
module, then Flink is unable to run the Job giving me this exception

job-cluster_1  | Caused by: java.lang.UnsupportedOperationException
job-cluster_1  |at
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassVisitor.visitNestHostExperimental(ClassVisitor.java:158)
job-cluster_1  |at
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassReader.accept(ClassReader.java:541)
job-cluster_1  |at
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassReader.accept(ClassReader.java:391)
job-cluster_1  |at
org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:187)
job-cluster_1  |at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
job-cluster_1  |at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
job-cluster_1  |at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
job-cluster_1  |at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
job-cluster_1  |at
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
job-cluster_1  |at
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:668)
job-cluster_1  |at
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:645)
job-cluster_1  |at
com.epam.monity.job.StreamJob.doTheJob(StreamJob.java:140)
job-cluster_1  |at
com.epam.monity.job.StreamJob.main(StreamJob.java:46)
job-cluster_1  |at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
job-cluster_1  |at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
job-cluster_1  |at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
job-cluster_1  |at java.base/java.lang.reflect.Method.invoke(Unknown
Source)
job-cluster_1  |at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
job-cluster_1  |... 13 more



Long story short, seems that for now, Job module has to be compiled to 1.8
with JDK 11 if Java 11 libraries are used.

[1]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Running Flink on java 11

2020-01-10 Thread KristoffSC
Hi, 
Yangze Guo, Chesnay Schepler thank you very much for your answers.

I have actually a funny setup.
So I have a Flink Job module, generated from Flink's maven archetype.
This module has all operators and Flink environment config and execution.
This module is compiled by maven with "maven.compiler.target" set to 1.8

However I'm using a 3rd party library that was compiled with java 11.
In order to build my main Job module I have to use JDK 11, however I still
have "maven.compiler.target" set to 1.8 there.

As a result, I have a Flink job jar, that has classes from Java 8 and 11.
Running javap -verbose proves it. All classes from Flink Job module are in
Java 8.

I can build Flink Job cluster image that is based on [1]. However i had to
change base image from openjdk:8-jre-alpine to
adoptopenjdk/openjdk11:jre-11.0.5_10-alpine plus remove installing
libc6-compat.

After rebuilding the docker image, Job cluster started and process messges.

On original openjdk:8-jre-alpine it was unable to start due issues with
loading classes from my 3rd party library (Unsupported major.minor version
exception)
So this seems to work.


However if I would change "maven.compiler.target" to Java 11 in my Flink Job
module, then Flink is unable to run the Job giving me this exception

job-cluster_1  | Caused by: java.lang.UnsupportedOperationException
job-cluster_1  |at
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassVisitor.visitNestHostExperimental(ClassVisitor.java:158)
job-cluster_1  |at
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassReader.accept(ClassReader.java:541)
job-cluster_1  |at
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassReader.accept(ClassReader.java:391)
job-cluster_1  |at
org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:187)
job-cluster_1  |at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
job-cluster_1  |at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
job-cluster_1  |at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
job-cluster_1  |at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
job-cluster_1  |at
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
job-cluster_1  |at
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:668)
job-cluster_1  |at
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:645)
job-cluster_1  |at
com.epam.monity.job.StreamJob.doTheJob(StreamJob.java:140)
job-cluster_1  |at
com.epam.monity.job.StreamJob.main(StreamJob.java:46)
job-cluster_1  |at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
job-cluster_1  |at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
job-cluster_1  |at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
job-cluster_1  |at java.base/java.lang.reflect.Method.invoke(Unknown
Source)
job-cluster_1  |at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
job-cluster_1  |... 13 more



Long story short, seems that for now, Job module has to be compiled to 1.8
with JDK 11 if Java 11 libraries are used.

[1]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Question] Failed to submit flink job to secure yarn cluster

2020-01-10 Thread Ethan Li
Hi Yangze,

Thanks for your reply. Those are the docs I have read and followed. (I was also 
able to set up a standalone flink cluster with secure HDFS, Zookeeper and Kafa. 
)

Could you please let me know what I am missing? Thanks


Best,
Ethan

> On Jan 10, 2020, at 6:28 AM, Yangze Guo  wrote:
> 
> Hi, Ethan
> 
> You could first check your cluster following this guide and check if
> all the related config[2] set correctly.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/security-kerberos.html
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#security-kerberos-login-contexts
> 
> Best,
> Yangze Guo
> 
> On Fri, Jan 10, 2020 at 10:37 AM Ethan Li  wrote:
>> 
>> Hello
>> 
>> I was following  
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/yarn_setup.html#run-a-flink-job-on-yarn
>>  and trying to submit a flink job on yarn.
>> 
>> I downloaded flink-1.9.1 and pre-bundled Hadoop 2.8.3 from 
>> https://flink.apache.org/downloads.html#apache-flink-191. I used default 
>> configs except:
>> 
>> security.kerberos.login.keytab: userA.keytab
>> security.kerberos.login.principal: userA@REALM
>> 
>> 
>> I have a secure Yarn cluster set up already. Then I ran “ ./bin/flink run -m 
>> yarn-cluster -p 1 -yjm 1024m -ytm 1024m ./examples/streaming/WordCount.jar” 
>> and got the following errors:
>> 
>> 
>> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
>> deploy Yarn session cluster
>> at 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:385)
>> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:251)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>> at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> Caused by: org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit 
>> application_1578605412668_0005 to YARN : Failed to renew token: Kind: 
>> kms-dt, Service: host3.com:3456, Ident: (owner=userA, renewer=adminB, 
>> realUser=, issueDate=1578606224956, maxDate=1579211024956, 
>> sequenceNumber=32, masterKeyId=52)
>> at 
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:275)
>> at 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1004)
>> at 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:507)
>> at 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:378)
>> ... 9 more
>> 
>> 
>> Full client 
>> log:https://gist.github.com/Ethanlm/221284bcaa272270a799957dc05b94fd
>> Resource manager log: 
>> https://gist.github.com/Ethanlm/ecd0a3eb25582ad6b1552927fc0e5c47
>> Hostname, IP address, username and etc. are anonymized.
>> 
>> 
>> Not sure how to proceed further. Wondering if anyone in the community has 
>> encountered this before. Thank you very much for your time!
>> 
>> Best,
>> Ethan
>> 



Re: Yarn Kerberos issue

2020-01-10 Thread Aljoscha Krettek

Hi,

Interesting! What problem are you seeing when you don't unset that 
environment variable? From reading UserGroupInformation.java our code 
should almost work when that environment variable is set.


Best,
Aljoscha

On 10.01.20 15:23, Juan Gentile wrote:

Hello Aljoscha!



The way we send the DTs to spark is by setting an env variable 
(HADOOP_TOKEN_FILE_LOCATION) which interestingly we have to unset when we run 
Flink because even if we do kinit that variable affects somehow Flink and 
doesn’t work.

I’m not an expert but what you describe (We would need to modify the 
SecurityUtils [2] to create a UGI from tokens, i.e. 
UserGroupInformation.createRemoteUser() and then addCreds() or addToken()) 
makes sense.

If you are able to do this it would be great and help us a lot!



Thank you,

Juan



On 1/10/20, 3:13 PM, "Aljoscha Krettek"  wrote:



 Hi,



 it seems I hin send to early, my mail was missing a small part. This is

 the full mail again:



 to summarize and clarify various emails: currently, you can only use

 Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant

 bit of code is in the Hadoop security module [1]. Here you can see that

 we either use keytab or try to login as a user.



 I think we should be able to extend this to also work with delegation

 tokens (DTs). We would need to modify the SecurityUtils [2] to create a

 UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then

 addCreds() or addToken(). In Spark, how do you pass the DTs to the

 system as a user?



 Best,

 Aljoscha



 [1]

 
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860605798sdata=VI0wk9X312%2FKgK%2Fe52%2BvGFhHZilnrv7x9rDzpwybIEQ%3Dreserved=0

 [2]

 
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2FSecurityUtils.java%23L89data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798sdata=mWRKoyG3ThIhpCZAPCUR7EGUJXDofw4DEhVvf0kpuGs%3Dreserved=0



 On 10.01.20 15:02, Aljoscha Krettek wrote:

 > Hi Juan,

 >

 > to summarize and clarify various emails: currently, you can only use

 > Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant

 > bit of code is in the Hadoop security module: [1]. Here you can see that

 > we either use keytab.

 >

 > I think we should be able to extend this to also work with delegation

 > tokens (DTs). In Spark, how do you pass the DTs to the system as a user?

 >

 > Best,

 > Aljoscha

 >

 > [1]

 > 
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798sdata=ULOtGZDsAnMEfgaQGjhEQpc7qnw5zCUc3I019SeJans%3Dreserved=0

 >

 >

 > On 06.01.20 09:52, Yang Wang wrote:

 >> I guess you have set some kerberos related configuration in spark

 >> jobs. For

 >> Flink, you need

 >> to do this too by the following configs. And the keytab file should

 >> existed

 >> on Flink client. In your

 >> environment, it means the scheduler(oozie) could access the keytab file.

 >>

 >> security.kerberos.login.keytab

 >> security.kerberos.login.principal

 >> security.kerberos.login.contexts

 >>

 >>

 >>

 >> Best,

 >> Yang

 >>

 >> Juan Gentile  于2020年1月6日周一 下午3:55写道:

 >>

 >>> Hello Rong, Chesnay,

 >>>

 >>>

 >>>

 >>> Thank you for your answer, the way we are trying to launch the job is

 >>> through a scheduler (similar to oozie) where we have a keytab for the

 >>> scheduler user and with that keytab we get delegation tokens

 >>> impersonating

 >>> the right user (owner of the job). But the only way I was able to

 >>> make this

 >>> work is by getting a ticket (through kinit).

 >>>

 >>> As a comparison, if I launch a spark job (without doing kinit) just 
with

 >>> the delegation tokens, it works okay. So I guess Spark does something

 >>> extra.

 >>>

 >>> This is as far as I could go but at this point I’m not sure if 

Re: Yarn Kerberos issue

2020-01-10 Thread Juan Gentile
Hello Aljoscha!



The way we send the DTs to spark is by setting an env variable 
(HADOOP_TOKEN_FILE_LOCATION) which interestingly we have to unset when we run 
Flink because even if we do kinit that variable affects somehow Flink and 
doesn’t work.

I’m not an expert but what you describe (We would need to modify the 
SecurityUtils [2] to create a UGI from tokens, i.e. 
UserGroupInformation.createRemoteUser() and then addCreds() or addToken()) 
makes sense.

If you are able to do this it would be great and help us a lot!



Thank you,

Juan



On 1/10/20, 3:13 PM, "Aljoscha Krettek"  wrote:



Hi,



it seems I hin send to early, my mail was missing a small part. This is

the full mail again:



to summarize and clarify various emails: currently, you can only use

Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant

bit of code is in the Hadoop security module [1]. Here you can see that

we either use keytab or try to login as a user.



I think we should be able to extend this to also work with delegation

tokens (DTs). We would need to modify the SecurityUtils [2] to create a

UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then

addCreds() or addToken(). In Spark, how do you pass the DTs to the

system as a user?



Best,

Aljoscha



[1]


https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860605798sdata=VI0wk9X312%2FKgK%2Fe52%2BvGFhHZilnrv7x9rDzpwybIEQ%3Dreserved=0

[2]


https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2FSecurityUtils.java%23L89data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798sdata=mWRKoyG3ThIhpCZAPCUR7EGUJXDofw4DEhVvf0kpuGs%3Dreserved=0



On 10.01.20 15:02, Aljoscha Krettek wrote:

> Hi Juan,

>

> to summarize and clarify various emails: currently, you can only use

> Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant

> bit of code is in the Hadoop security module: [1]. Here you can see that

> we either use keytab.

>

> I think we should be able to extend this to also work with delegation

> tokens (DTs). In Spark, how do you pass the DTs to the system as a user?

>

> Best,

> Aljoscha

>

> [1]

> 
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798sdata=ULOtGZDsAnMEfgaQGjhEQpc7qnw5zCUc3I019SeJans%3Dreserved=0

>

>

> On 06.01.20 09:52, Yang Wang wrote:

>> I guess you have set some kerberos related configuration in spark

>> jobs. For

>> Flink, you need

>> to do this too by the following configs. And the keytab file should

>> existed

>> on Flink client. In your

>> environment, it means the scheduler(oozie) could access the keytab file.

>>

>> security.kerberos.login.keytab

>> security.kerberos.login.principal

>> security.kerberos.login.contexts

>>

>>

>>

>> Best,

>> Yang

>>

>> Juan Gentile  于2020年1月6日周一 下午3:55写道:

>>

>>> Hello Rong, Chesnay,

>>>

>>>

>>>

>>> Thank you for your answer, the way we are trying to launch the job is

>>> through a scheduler (similar to oozie) where we have a keytab for the

>>> scheduler user and with that keytab we get delegation tokens

>>> impersonating

>>> the right user (owner of the job). But the only way I was able to

>>> make this

>>> work is by getting a ticket (through kinit).

>>>

>>> As a comparison, if I launch a spark job (without doing kinit) just with

>>> the delegation tokens, it works okay. So I guess Spark does something

>>> extra.

>>>

>>> This is as far as I could go but at this point I’m not sure if this is

>>> something just not supported by Flink or I’m doing something wrong.

>>>

>>>

>>>

>>> Thank you,

>>>

>>> Juan

>>>

>>>

>>>

>>> *From: *Rong Rong 

>>> *Date: *Saturday, January 4, 2020 at 6:06 PM

>>> *To: *Chesnay Schepler 

>>> *Cc: *Juan Gentile , 

Re: How can I find out which key group belongs to which subtask

2020-01-10 Thread Till Rohrmann
Hi,

you would need to set the co-location constraint in order to ensure that
the sub-tasks of operators are deployed to the same machine. It effectively
means that subtasks a_i, b_i  of operator a and b will be deployed to the
same slot. This feature is not super well exposed but you can take a look
at [1] to see how it can be used.

[1] https://issues.apache.org/jira/browse/FLINK-9809

Cheers,
Till

On Fri, Jan 10, 2020 at 9:08 AM Zhijiang  wrote:

> Only chained operators can avoid record serialization cost, but the
> chaining mode can not support keyed stream.
> If you want to deploy downstream with upstream in the same task manager,
> it can avoid network shuffle cost which can still get performance benefits.
> As I know @Till Rohrmann has implemented some enhancements in scheduler
> layer to support such requirement in release-1.10. You can have a try when
> the rc candidate is ready.
>
> Best,
> Zhijiang
>
> --
> From:杨东晓 
> Send Time:2020 Jan. 10 (Fri.) 02:10
> To:Congxian Qiu 
> Cc:user 
> Subject:Re: How can I find out which key group belongs to which subtask
>
> Thanks Congxian!
>  My purpose is not only make data goes into one same subtask but the
> specific subtask which belongs to same taskmanager with upstream record.
> The key idea is to avoid shuffling  between taskmanagers.
> I think the KeyGroupRangeAssignment.java
> 
> explained a lot about how to get keygroup and subtask context that can make
> that happen.
> Do you know if there are still  serialization happening while data
> transferred between operator in same taskmanager?
> Thanks.
>
> Congxian Qiu  于2020年1月9日周四 上午1:55写道:
> Hi
>
> If you just want to make sure some key goes into the same subtask, does
> custom key selector[1] help?
>
> For the keygroup and subtask information, you can ref to
> KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you
> can ref to doc[3]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions
> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism
>
> Best,
> Congxian
>
>
> 杨东晓  于2020年1月9日周四 上午7:47写道:
> Hi , I'm trying to do some optimize about Flink 'keyby' processfunction.
> Is there any possible I can find out one key belongs to which key-group
> and essentially find out one key-group belongs to which subtask.
> The motivation I want to know that is we want to  force the data records
> from upstream still goes to same taskmanager downstream subtask .Which
> means even if we use a keyedstream function we still want no cross jvm
> communication happened during run time.
> And if we can achieve that , can we also avoid the expensive cost for
> record serialization because data is only transferred in same taskmanager
> jvm instance?
>
> Thanks.
>
>
>


Re: Yarn Kerberos issue

2020-01-10 Thread Aljoscha Krettek

Hi,

it seems I hin send to early, my mail was missing a small part. This is 
the full mail again:


to summarize and clarify various emails: currently, you can only use 
Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant 
bit of code is in the Hadoop security module [1]. Here you can see that 
we either use keytab or try to login as a user.


I think we should be able to extend this to also work with delegation 
tokens (DTs). We would need to modify the SecurityUtils [2] to create a 
UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then 
addCreds() or addToken(). In Spark, how do you pass the DTs to the 
system as a user?


Best,
Aljoscha

[1] 
https://github.com/apache/flink/blob/64e2f27640946bf3b1608d4d85585fe18891dcee/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L68
[2] 
https://github.com/apache/flink/blob/2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java#L89


On 10.01.20 15:02, Aljoscha Krettek wrote:

Hi Juan,

to summarize and clarify various emails: currently, you can only use 
Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant 
bit of code is in the Hadoop security module: [1]. Here you can see that 
we either use keytab.


I think we should be able to extend this to also work with delegation 
tokens (DTs). In Spark, how do you pass the DTs to the system as a user?


Best,
Aljoscha

[1] 
https://github.com/apache/flink/blob/64e2f27640946bf3b1608d4d85585fe18891dcee/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L68 



On 06.01.20 09:52, Yang Wang wrote:
I guess you have set some kerberos related configuration in spark 
jobs. For

Flink, you need
to do this too by the following configs. And the keytab file should 
existed

on Flink client. In your
environment, it means the scheduler(oozie) could access the keytab file.

security.kerberos.login.keytab
security.kerberos.login.principal
security.kerberos.login.contexts



Best,
Yang

Juan Gentile  于2020年1月6日周一 下午3:55写道:


Hello Rong, Chesnay,



Thank you for your answer, the way we are trying to launch the job is
through a scheduler (similar to oozie) where we have a keytab for the
scheduler user and with that keytab we get delegation tokens 
impersonating
the right user (owner of the job). But the only way I was able to 
make this

work is by getting a ticket (through kinit).

As a comparison, if I launch a spark job (without doing kinit) just with
the delegation tokens, it works okay. So I guess Spark does something 
extra.


This is as far as I could go but at this point I’m not sure if this is
something just not supported by Flink or I’m doing something wrong.



Thank you,

Juan



*From: *Rong Rong 
*Date: *Saturday, January 4, 2020 at 6:06 PM
*To: *Chesnay Schepler 
*Cc: *Juan Gentile , "user@flink.apache.org" <
user@flink.apache.org>, Oleksandr Nitavskyi 
*Subject: *Re: Yarn Kerberos issue



Hi Juan,



Chesnay was right. If you are using CLI to launch your session cluster
based on the document [1], you following the instruction to use kinit 
[2]

first seems to be one of the right way to go.

Another way of approaching it is to setup the kerberos settings in the
flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up 
your

keytab files and run the CLI securely.



As far as I know the option `security.kerberos.login.use-ticket-cache`
doesn't actually change the behavior of the authentication process, 
it is
more of a hint whether to use the ticket cache instantiated by 
`kinit`. If

you disable using the ticket cache, you will have to use the
"keytab/principle" approach - this doc [4] might be helpful to explain
better.



Thanks,

Rong





[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#start-flink-session 

 



[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#using-kinit-yarn-only 

 



[3]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#yarnmesos-mode 


Re: Yarn Kerberos issue

2020-01-10 Thread Aljoscha Krettek

Hi Juan,

to summarize and clarify various emails: currently, you can only use 
Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant 
bit of code is in the Hadoop security module: [1]. Here you can see that 
we either use keytab.


I think we should be able to extend this to also work with delegation 
tokens (DTs). In Spark, how do you pass the DTs to the system as a user?


Best,
Aljoscha

[1] 
https://github.com/apache/flink/blob/64e2f27640946bf3b1608d4d85585fe18891dcee/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L68


On 06.01.20 09:52, Yang Wang wrote:

I guess you have set some kerberos related configuration in spark jobs. For
Flink, you need
to do this too by the following configs. And the keytab file should existed
on Flink client. In your
environment, it means the scheduler(oozie) could access the keytab file.

security.kerberos.login.keytab
security.kerberos.login.principal
security.kerberos.login.contexts



Best,
Yang

Juan Gentile  于2020年1月6日周一 下午3:55写道:


Hello Rong, Chesnay,



Thank you for your answer, the way we are trying to launch the job is
through a scheduler (similar to oozie) where we have a keytab for the
scheduler user and with that keytab we get delegation tokens impersonating
the right user (owner of the job). But the only way I was able to make this
work is by getting a ticket (through kinit).

As a comparison, if I launch a spark job (without doing kinit) just with
the delegation tokens, it works okay. So I guess Spark does something extra.

This is as far as I could go but at this point I’m not sure if this is
something just not supported by Flink or I’m doing something wrong.



Thank you,

Juan



*From: *Rong Rong 
*Date: *Saturday, January 4, 2020 at 6:06 PM
*To: *Chesnay Schepler 
*Cc: *Juan Gentile , "user@flink.apache.org" <
user@flink.apache.org>, Oleksandr Nitavskyi 
*Subject: *Re: Yarn Kerberos issue



Hi Juan,



Chesnay was right. If you are using CLI to launch your session cluster
based on the document [1], you following the instruction to use kinit [2]
first seems to be one of the right way to go.

Another way of approaching it is to setup the kerberos settings in the
flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up your
keytab files and run the CLI securely.



As far as I know the option `security.kerberos.login.use-ticket-cache`
doesn't actually change the behavior of the authentication process, it is
more of a hint whether to use the ticket cache instantiated by `kinit`. If
you disable using the ticket cache, you will have to use the
"keytab/principle" approach - this doc [4] might be helpful to explain
better.



Thanks,

Rong





[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#start-flink-session


[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#using-kinit-yarn-only


[3]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#yarnmesos-mode


[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#enabling-kerberos-authentication-for-versions-09-and-above-only




On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler 
wrote:

 From what I understand from the documentation, if you want to use
delegation tokens you always first have to issue a ticket using kinit; so
you did everything correctly?



On 02/01/2020 13:00, Juan Gentile wrote:

Hello,



Im trying to submit a job (batch 

Re: Please suggest helpful tools

2020-01-10 Thread Congxian Qiu
Hi

For expired checkpoint, you can find something like " Checkpoint xxx of job
xx expired before completing" in jobmanager.log, then you can go to the
checkpoint UI to find which tasks did not ack, and go to these tasks to see
what happened.

If checkpoint was been declined, you can find something like "Decline
checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this
case, you can go to the task directly to find out why the checkpoint failed.

Best,
Congxian


Yun Tang  于2020年1月10日周五 下午7:31写道:

> Hi Eva
>
> If checkpoint failed, please view the web UI or jobmanager log to see why
> checkpoint failed, might be declined by some specific task.
>
> If checkpoint expired, you can also access the web UI to see which tasks
> did not respond in time, some hot task might not be able to respond in
> time. Generally speaking, checkpoint expired is mostly caused by back
> pressure which led the checkpoint barrier did not arrive in time. Resolve
> the back pressure could help the checkpoint finished before timeout.
>
> I think the doc of monitoring web UI for checkpoint [1] and back pressure
> [2] could help you.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>
> Best
> Yun Tang
> --
> *From:* Eva Eva 
> *Sent:* Friday, January 10, 2020 10:29
> *To:* user 
> *Subject:* Please suggest helpful tools
>
> Hi,
>
> I'm running Flink job on 1.9 version with blink planner.
>
> My checkpoints are timing out intermittently, but as state grows they are
> timing out more and more often eventually killing the job.
>
> Size of the state is large with Minimum=10.2MB and Maximum=49GB (this one
> is accumulated due to prior failed ones), Average=8.44GB.
>
> Although size is huge, I have enough space on EC2 instance in which I'm
> running job. I'm using RocksDB for checkpointing.
>
> *Logs does not have any useful information to understand why checkpoints
> are expiring/failing, can someone please point me to tools that can be used
> to investigate and understand why checkpoints are failing.*
>
> Also any other related suggestions are welcome.
>
>
> Thanks,
> Reva.
>


Re: [Question] Failed to submit flink job to secure yarn cluster

2020-01-10 Thread Yangze Guo
Hi, Ethan

You could first check your cluster following this guide and check if
all the related config[2] set correctly.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/security-kerberos.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#security-kerberos-login-contexts

Best,
Yangze Guo

On Fri, Jan 10, 2020 at 10:37 AM Ethan Li  wrote:
>
> Hello
>
> I was following  
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/yarn_setup.html#run-a-flink-job-on-yarn
>  and trying to submit a flink job on yarn.
>
> I downloaded flink-1.9.1 and pre-bundled Hadoop 2.8.3 from 
> https://flink.apache.org/downloads.html#apache-flink-191. I used default 
> configs except:
>
> security.kerberos.login.keytab: userA.keytab
> security.kerberos.login.principal: userA@REALM
>
>
> I have a secure Yarn cluster set up already. Then I ran “ ./bin/flink run -m 
> yarn-cluster -p 1 -yjm 1024m -ytm 1024m ./examples/streaming/WordCount.jar” 
> and got the following errors:
>
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn session cluster
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:385)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:251)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit 
> application_1578605412668_0005 to YARN : Failed to renew token: Kind: kms-dt, 
> Service: host3.com:3456, Ident: (owner=userA, renewer=adminB, realUser=, 
> issueDate=1578606224956, maxDate=1579211024956, sequenceNumber=32, 
> masterKeyId=52)
> at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:275)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1004)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:507)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:378)
> ... 9 more
>
>
> Full client 
> log:https://gist.github.com/Ethanlm/221284bcaa272270a799957dc05b94fd
> Resource manager log: 
> https://gist.github.com/Ethanlm/ecd0a3eb25582ad6b1552927fc0e5c47
> Hostname, IP address, username and etc. are anonymized.
>
>
> Not sure how to proceed further. Wondering if anyone in the community has 
> encountered this before. Thank you very much for your time!
>
> Best,
> Ethan
>


Re: Flink Job claster scalability

2020-01-10 Thread Yangze Guo
Hi KristoffSC

As Zhu said, Flink enables slot sharing[1] by default. This feature is
nothing to do with the resource of your cluster. The benefit of this
feature is written in [1] as well. I mean, it will not detect how many
slots in your cluster and adjust its behavior toward this number. If
you want to make the best use of your cluster, you can increase the
parallelism of the vertex that has the largest parallelism or
"disable" the slot sharing by [2]. IMO, the first way matches your
purpose.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#task-chaining-and-resource-groups

Best,
Yangze Guo

On Fri, Jan 10, 2020 at 6:49 PM KristoffSC
 wrote:
>
> Hi Zhu Zhu,
> well In my last test I did not change the job config, so I did not change
> the parallelism level of any operator and I did not change policy regarding
> slot sharing (it stays as default one). Operator Chaining is set to true
> without any extra actions like "start new chain, disable chain etc"
>
> What I assume however is that Flink will try find most efficient way to use
> available resources during job submission.
>
> In the first case, where I had only 6 task managers (which matches max
> parallelism of my JobVertex), Flink reused some TaskSlots. Adding extra task
> slots did was not effective because reason described by David. This is
> understandable.
>
> However, I was assuming that if I submit my job on a cluster that have more
> task managers than 6, Flink will not share task slots by default. That did
> not happen. Flink deployed the job in the same way regardless of extra
> resources.
>
>
> So the conclusion is that simple job resubmitting will not work in this case
> and actually I cant have any certainty that it will. Since in my case Flink
> still reuses slot task.
>
> If this would be the production case, I would have to do a test job
> submission on testing env and potentially change the job. Not the config,
> but adding  slot sharing groups etc.
> So if this would be the production case I will not be able to react fast, I
> would have to deploy new version of my app/job which could be problematic.
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Running Flink on java 11

2020-01-10 Thread Chesnay Schepler

In regards to what we test:

We run our tests against Java 8 *and *Java 11, with the compilation and 
testing being done with the same JDK.
In other words, we don't check whether Flink compiled with JDK 8 runs on 
JDK 11, but we currently have no reason to believe that there is a 
problem (and some anecdotal evidence to support this).
Some tests are excluded from Java 11; some due to library 
incompatibility (e.g., Cassandra), some because our docker images are 
still using Java 8 (anything docker-related stuff is _not_ tested for 
Java 11, as are Yarn end-to-end tests), and some for currently unknown 
reasons (Kafka end-to-end test).


You _should_ be free to use any JDK 11 exclusive APIs within your code, 
but we haven't tested it specifically.


You don't have to re-compile Flink; the binary that we will be releasing 
should work on both Java 8 and 11.


Do note that we have not done any tests on JDK 12+.

On 10/01/2020 10:13, Krzysztof Chmielewski wrote:

Hi,
Thank you for your answer. Btw it seams that you send the replay only 
to my address and not to the mailing list :)


I'm looking forward to try out 1.10-rc then.

Regarding second thing you wrote, that
/"on Java 11, all the tests(including end to end tests) would be run 
with Java 11 profile now."/
I'm not sure if I get that fully. You meant that currently Flink 
builds are running on java 11?


I was not rebuilding Flink 1.9.1 sources with JDK 11. I just ran 1.9.1 
build on JRE 11 locally on my machine and
I also modify Job Cluster Dockerfile to use openjdk:13-jdk-alpine as a 
base image instead openjdk:8-jre-alpine.


Are here any users who are currently running Flink on Java 11 or 
higher? What are your experiences?


Thanks,

pt., 10 sty 2020 o 03:14 Yangze Guo > napisał(a):


Hi, Krzysztof

Regarding the release-1.10, the community is now focus on this effort.
I believe we will have our first release candidate soon.

Regarding the issue when running Flink on Java 11, all the
tests(including end to end tests) would be run with Java 11 profile
now. If you meet any problem, feel free to open a new JIRA ticket or
ask in user/dev ML.

Best,
Yangze Guo

On Fri, Jan 10, 2020 at 1:11 AM KristoffSC
mailto:krzysiek.chmielew...@gmail.com>> wrote:
>
> Hi guys,
> well We have requirement in our project to use Java 11, although
we would
> really like to use Flink because it seems to match our needs
perfectly.
>
> We were testing it on java 1.8 and all looks fine.
> We tried to run it on Java 11 and also looks fine, at least for now.
>
> We were also running this as a Job Cluster, and since those
images [1] are
> based on openjdk:8-jre-alpine we switch to java 13-jdk-alpine.
Cluster
> started and submitted the job. All seemed fine.
>
> The Job and 3rd party library that this job is using were
compiled with Java
> 11.
> I was looking for any posts related to java 11 issues and I've
found this
> [2] one.
> We are also aware of ongoing FLINK-10725 [3] but this is
assigned to 1.10
> FLink version
>
> Having all of this, I would like to ask few questions
>
> 1. Is there any release date planed for 1.10?
> 2. Are you aware of any issues regarding running Flink on Java 11?
> 3. If my Job code would not use any code features from java 11,
would flink
> handle it when running on java 11? Or they are some internal
functionalities
> that would not be working on Java 11 (things that are using
unsafe or
> reflections?)
>
> Thanks,
> Krzysztof
>
>
> [1]
>

https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
> [2]
>

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/UnsupportedOperationException-from-org-apache-flink-shaded-asm6-org-objectweb-asm-ClassVisitor-visit1-td28571.html
> [3] https://issues.apache.org/jira/browse/FLINK-10725
>
>
>
> --
> Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Running Flink on java 11

2020-01-10 Thread Yangze Guo
Hi Krzysztof

All the tests run with Java 11 after FLINK-13457[1]. Its fix version
is set to 1.10. So, I fear 1.9.1 is not guaranteed to be running on
java 11. I suggest you to wait for the release-1.10.

[1]https://issues.apache.org/jira/browse/FLINK-13457

Best,
Yangze Guo

On Fri, Jan 10, 2020 at 5:13 PM Krzysztof Chmielewski
 wrote:
>
> Hi,
> Thank you for your answer. Btw it seams that you send the replay only to my 
> address and not to the mailing list :)
>
> I'm looking forward to try out 1.10-rc then.
>
> Regarding second thing you wrote, that
> "on Java 11, all the tests(including end to end tests) would be run with Java 
> 11 profile now."
> I'm not sure if I get that fully. You meant that currently Flink builds are 
> running on java 11?
>
> I was not rebuilding Flink 1.9.1 sources with JDK 11. I just ran 1.9.1 build 
> on JRE 11 locally on my machine and
> I also modify Job Cluster Dockerfile to use openjdk:13-jdk-alpine as a base 
> image instead openjdk:8-jre-alpine.
>
> Are here any users who are currently running Flink on Java 11 or higher? What 
> are your experiences?
>
> Thanks,
>
> pt., 10 sty 2020 o 03:14 Yangze Guo  napisał(a):
>>
>> Hi, Krzysztof
>>
>> Regarding the release-1.10, the community is now focus on this effort.
>> I believe we will have our first release candidate soon.
>>
>> Regarding the issue when running Flink on Java 11, all the
>> tests(including end to end tests) would be run with Java 11 profile
>> now. If you meet any problem, feel free to open a new JIRA ticket or
>> ask in user/dev ML.
>>
>> Best,
>> Yangze Guo
>>
>> On Fri, Jan 10, 2020 at 1:11 AM KristoffSC
>>  wrote:
>> >
>> > Hi guys,
>> > well We have requirement in our project to use Java 11, although we would
>> > really like to use Flink because it seems to match our needs perfectly.
>> >
>> > We were testing it on java 1.8 and all looks fine.
>> > We tried to run it on Java 11 and also looks fine, at least for now.
>> >
>> > We were also running this as a Job Cluster, and since those images [1] are
>> > based on openjdk:8-jre-alpine we switch to java 13-jdk-alpine. Cluster
>> > started and submitted the job. All seemed fine.
>> >
>> > The Job and 3rd party library that this job is using were compiled with 
>> > Java
>> > 11.
>> > I was looking for any posts related to java 11 issues and I've found this
>> > [2] one.
>> > We are also aware of ongoing FLINK-10725 [3] but this is assigned to 1.10
>> > FLink version
>> >
>> > Having all of this, I would like to ask few questions
>> >
>> > 1. Is there any release date planed for 1.10?
>> > 2. Are you aware of any issues regarding running Flink on Java 11?
>> > 3. If my Job code would not use any code features from java 11, would flink
>> > handle it when running on java 11? Or they are some internal 
>> > functionalities
>> > that would not be working on Java 11 (things that are using unsafe or
>> > reflections?)
>> >
>> > Thanks,
>> > Krzysztof
>> >
>> >
>> > [1]
>> > https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
>> > [2]
>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/UnsupportedOperationException-from-org-apache-flink-shaded-asm6-org-objectweb-asm-ClassVisitor-visit1-td28571.html
>> > [3] https://issues.apache.org/jira/browse/FLINK-10725
>> >
>> >
>> >
>> > --
>> > Sent from: 
>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Please suggest helpful tools

2020-01-10 Thread Yun Tang
Hi Eva

If checkpoint failed, please view the web UI or jobmanager log to see why 
checkpoint failed, might be declined by some specific task.

If checkpoint expired, you can also access the web UI to see which tasks did 
not respond in time, some hot task might not be able to respond in time. 
Generally speaking, checkpoint expired is mostly caused by back pressure which 
led the checkpoint barrier did not arrive in time. Resolve the back pressure 
could help the checkpoint finished before timeout.

I think the doc of monitoring web UI for checkpoint [1] and back pressure [2] 
could help you.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html

Best
Yun Tang

From: Eva Eva 
Sent: Friday, January 10, 2020 10:29
To: user 
Subject: Please suggest helpful tools

Hi,

I'm running Flink job on 1.9 version with blink planner.

My checkpoints are timing out intermittently, but as state grows they are 
timing out more and more often eventually killing the job.

Size of the state is large with Minimum=10.2MB and Maximum=49GB (this one is 
accumulated due to prior failed ones), Average=8.44GB.

Although size is huge, I have enough space on EC2 instance in which I'm running 
job. I'm using RocksDB for checkpointing.

Logs does not have any useful information to understand why checkpoints are 
expiring/failing, can someone please point me to tools that can be used to 
investigate and understand why checkpoints are failing.

Also any other related suggestions are welcome.


Thanks,
Reva.


Re: Flink Job claster scalability

2020-01-10 Thread KristoffSC
Hi Zhu Zhu,
well In my last test I did not change the job config, so I did not change
the parallelism level of any operator and I did not change policy regarding
slot sharing (it stays as default one). Operator Chaining is set to true
without any extra actions like "start new chain, disable chain etc"

What I assume however is that Flink will try find most efficient way to use
available resources during job submission. 

In the first case, where I had only 6 task managers (which matches max
parallelism of my JobVertex), Flink reused some TaskSlots. Adding extra task
slots did was not effective because reason described by David. This is
understandable.

However, I was assuming that if I submit my job on a cluster that have more
task managers than 6, Flink will not share task slots by default. That did
not happen. Flink deployed the job in the same way regardless of extra
resources.


So the conclusion is that simple job resubmitting will not work in this case
and actually I cant have any certainty that it will. Since in my case Flink
still reuses slot task.

If this would be the production case, I would have to do a test job
submission on testing env and potentially change the job. Not the config,
but adding  slot sharing groups etc. 
So if this would be the production case I will not be able to react fast, I
would have to deploy new version of my app/job which could be problematic. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Re: flink savepoint checkpoint

2020-01-10 Thread muyexm329
其实这个主要还是要看你checkpoint的时间间隔,就像我们看视频倒退一样,它们是两个不同的后退时间点,savepoint能在当下生成checkpoint数据,但是自动的checkpoint可能还要在更早的时间点上生成checkpoint数据(因为在cancel
 job的时候可能还不到自动checkpoint时间)。两种都可以,只是是一前一后,这也决定了你任务恢复的快慢。线上需要经常修改的job 
savepoint很实用。 
个人觉得任务失败,不管是哪种方式失败(除非是savepoint),肯定是回到上一个自动checkpoint的点上,不会是在savepoint。


 原始邮件 
发件人: amen...@163.com
收件人: user-zh
发送时间: 2020年1月10日(周五) 17:58
主题: Re: Re: flink savepoint checkpoint


hi,了解到使用stop进行任务停止并触发savepoint,会在停止之前生成max_watermark,并注册event-time计时器,我想请问使用yarn
 kill方式直接停止任务,会属于cancel还是stop亦或是其他? amen...@163.com From: Congxian Qiu Date: 
2020-01-10 17:16 To: user-zh Subject: Re: flink savepoint checkpoint Hi 从 Flink 
的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job 之间的状态复用。 另外,从 1.9 
开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做 StopWithCheckpoint[2] [1] 
https://issues.apache.org/jira/browse/FLINK-11458 [2] 
https://issues.apache.org/jira/browse/FLINK-12619 Best, Congxian zhisheng 
 于2020年1月10日周五 上午11:39写道: > 
hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint > 
的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。 > > 祝好! > zhisheng > > 
Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道: > > > Hello 
,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点 > > --> > > > > > > > 
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 > > > > > > lucas.wu  于2019年12月11日周三 上午11:56写道: > > > > 
> hi 各位: > > > > > > > > > 
有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
 > > >

Re: Re: flink savepoint checkpoint

2020-01-10 Thread amen...@163.com
hi,了解到使用stop进行任务停止并触发savepoint,会在停止之前生成max_watermark,并注册event-time计时器,我想请问使用yarn
 kill方式直接停止任务,会属于cancel还是stop亦或是其他?



amen...@163.com
 
From: Congxian Qiu
Date: 2020-01-10 17:16
To: user-zh
Subject: Re: flink savepoint checkpoint
Hi
从 Flink 的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job
之间的状态复用。
另外,从 1.9 开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做
StopWithCheckpoint[2]
 
[1] https://issues.apache.org/jira/browse/FLINK-11458
[2] https://issues.apache.org/jira/browse/FLINK-12619
Best,
Congxian
 
 
zhisheng  于2020年1月10日周五 上午11:39写道:
 
> hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint
> 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。
>
> 祝好!
> zhisheng
>
> Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道:
>
> > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
> > -->
> >
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >
> >
> > lucas.wu  于2019年12月11日周三 上午11:56写道:
> >
> > > hi 各位:
> > >
> > >
> >
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
> >
>


Re: flink savepoint checkpoint

2020-01-10 Thread Congxian Qiu
Hi
从 Flink 的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job
之间的状态复用。
另外,从 1.9 开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做
StopWithCheckpoint[2]

[1] https://issues.apache.org/jira/browse/FLINK-11458
[2] https://issues.apache.org/jira/browse/FLINK-12619
Best,
Congxian


zhisheng  于2020年1月10日周五 上午11:39写道:

> hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint
> 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。
>
> 祝好!
> zhisheng
>
> Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道:
>
> > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
> > -->
> >
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >
> >
> > lucas.wu  于2019年12月11日周三 上午11:56写道:
> >
> > > hi 各位:
> > >
> > >
> >
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
> >
>


Re: Running Flink on java 11

2020-01-10 Thread Krzysztof Chmielewski
Hi,
Thank you for your answer. Btw it seams that you send the replay only to my
address and not to the mailing list :)

I'm looking forward to try out 1.10-rc then.

Regarding second thing you wrote, that
*"on Java 11, all the tests(including end to end tests) would be run with
Java 11 profile now."*
I'm not sure if I get that fully. You meant that currently Flink builds are
running on java 11?

I was not rebuilding Flink 1.9.1 sources with JDK 11. I just ran 1.9.1
build on JRE 11 locally on my machine and
I also modify Job Cluster Dockerfile to use openjdk:13-jdk-alpine as a base
image instead openjdk:8-jre-alpine.

Are here any users who are currently running Flink on Java 11 or higher?
What are your experiences?

Thanks,

pt., 10 sty 2020 o 03:14 Yangze Guo  napisał(a):

> Hi, Krzysztof
>
> Regarding the release-1.10, the community is now focus on this effort.
> I believe we will have our first release candidate soon.
>
> Regarding the issue when running Flink on Java 11, all the
> tests(including end to end tests) would be run with Java 11 profile
> now. If you meet any problem, feel free to open a new JIRA ticket or
> ask in user/dev ML.
>
> Best,
> Yangze Guo
>
> On Fri, Jan 10, 2020 at 1:11 AM KristoffSC
>  wrote:
> >
> > Hi guys,
> > well We have requirement in our project to use Java 11, although we would
> > really like to use Flink because it seems to match our needs perfectly.
> >
> > We were testing it on java 1.8 and all looks fine.
> > We tried to run it on Java 11 and also looks fine, at least for now.
> >
> > We were also running this as a Job Cluster, and since those images [1]
> are
> > based on openjdk:8-jre-alpine we switch to java 13-jdk-alpine. Cluster
> > started and submitted the job. All seemed fine.
> >
> > The Job and 3rd party library that this job is using were compiled with
> Java
> > 11.
> > I was looking for any posts related to java 11 issues and I've found this
> > [2] one.
> > We are also aware of ongoing FLINK-10725 [3] but this is assigned to 1.10
> > FLink version
> >
> > Having all of this, I would like to ask few questions
> >
> > 1. Is there any release date planed for 1.10?
> > 2. Are you aware of any issues regarding running Flink on Java 11?
> > 3. If my Job code would not use any code features from java 11, would
> flink
> > handle it when running on java 11? Or they are some internal
> functionalities
> > that would not be working on Java 11 (things that are using unsafe or
> > reflections?)
> >
> > Thanks,
> > Krzysztof
> >
> >
> > [1]
> >
> https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
> > [2]
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/UnsupportedOperationException-from-org-apache-flink-shaded-asm6-org-objectweb-asm-ClassVisitor-visit1-td28571.html
> > [3] https://issues.apache.org/jira/browse/FLINK-10725
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Null result cannot be used for atomic types

2020-01-10 Thread Jingsong Li
Hi sunfulin,

Looks like the error is happened in sink instead of source.

Caused by: java.lang.NullPointerException: Null result cannot be used for
atomic types.

 at DataStreamSinkConversion$5.map(Unknown Source)


So the point is how did you write to sink. Can you share these codes?


Best,

Jingsong Lee

On Fri, Jan 10, 2020 at 2:58 PM godfrey he  wrote:

> hi sunfulin,
>
> which flink version are you using ?
>
> best,
> godfrey
>
> sunfulin  于2020年1月10日周五 下午1:50写道:
>
>> Hi, I am running a Flink app while reading Kafka records with JSON
>> format. And the connect code is like the following:
>>
>>
>> tableEnv.connect(
>>
>> new Kafka()
>>
>> .version(kafkaInstance.getVersion())
>>
>> .topic(chooseKafkaTopic(initPack.clusterMode))
>>
>> .property("bootstrap.servers",
>> kafkaInstance.getBrokerList())
>>
>> .property("group.id", initPack.jobName)
>>
>> .startFromEarliest()
>>
>> ).withSchema(
>>
>> new Schema()
>>
>> // EVENT_TIME
>>
>> .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
>>
>> new Rowtime()
>>
>> .timestampsFromField("time")
>>
>> .watermarksPeriodicBounded(1000)
>>
>> )
>>
>> .field("type", Types.STRING)
>>
>> .field("event", Types.STRING)
>>
>> .field("user_id", Types.STRING)
>>
>> .field("distinct_id", Types.STRING)
>>
>> .field("project", Types.STRING)
>>
>> .field("recv_time", Types.SQL_TIMESTAMP)
>>
>> .field("properties", Types.ROW_NAMED(
>>
>> new String[] { "BROWSER_VERSION", "pathname",
>> "search", "eventType", "message", "stack", "componentStack" },
>>
>> Types.STRING, Types.STRING, Types.STRING,
>> Types.STRING, Types.STRING, Types.STRING, Types.STRING)
>>
>> )
>>
>> ).withFormat(
>>
>> new Json().failOnMissingField(false)
>>
>> .deriveSchema()
>>
>> )
>>
>> .inAppendMode()
>>
>> .registerTableSource(getTableName());
>>
>>
>>
>> However, the application throws the following Exception which really
>> confused me. From the code above, the field types are only *Types.STRING*
>> or *Types.SQL_TIMESTAMP. *
>>
>> *Not sure which data field can run to this. Wanner some help from
>> community.*
>>
>>
>> Caused by: java.lang.NullPointerException: Null result cannot be used for
>> atomic types.
>>
>>  at DataStreamSinkConversion$5.map(Unknown Source)
>>
>>  at
>> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)
>>
>>  at
>> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)
>>
>>  at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>
>>  at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>
>>  at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>
>>  at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>
>>  at
>> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>>
>>  at
>> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>>
>>  at DataStreamSourceConversion$2.processElement(Unknown Source)
>>
>>  at
>> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
>>
>>  at
>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>
>>  at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>
>>  at org.apache.flink.streaming.
>>
>>
>>
>>
>>
>

-- 
Best, Jingsong Lee


Re: Elasticsink sometimes gives NoClassDefFoundError

2020-01-10 Thread Arvid Heise
I don't see a particular reason why you see this behavior. Chesnay's
explanation is the only plausible way that this behavior can happen.

I fear that without a specific log, we cannot help further.

On Fri, Jan 10, 2020 at 5:06 AM Jayant Ameta  wrote:

> Also, the ES version I'm using is 5.6.7
>
> Jayant
>
>
> On Thu, Jan 9, 2020 at 10:39 AM Jayant Ameta  wrote:
>
>> Hi,
>> The elastic connector is packaged in the uber jar that is submitted.
>> There is only 1 version of the connector:
>> flink-connector-elasticsearch5_2.11:1.7.1
>> I'm using Flink 1.7.1
>>
>> I couldn't figure out whether this error causes the job to fail, or
>> whether I see this error when the job is restarting after some other
>> failure.
>> But, the occurrence of this error and job restarts is correlated.
>>
>>
>> Jayant Ameta
>>
>>
>> On Wed, Jan 8, 2020 at 6:47 PM Arvid Heise  wrote:
>>
>>> Hi Jayant,
>>>
>>> if you only see it sometimes that indicates that you have it in two
>>> different versions of the connectors where class loader order is
>>> non-deterministic. Could you post the classpath?
>>>
>>> Btw, it's always good to add which Flink version you use.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Wed, Jan 8, 2020 at 12:20 PM Jayant Ameta 
>>> wrote:
>>>
 Hi,
 I see the following error sometimes on my flink job, even though the
 class is present in my uber jar.

 java.lang.NoClassDefFoundError:
 org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1
 at
 org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.connect(NioClientSocketPipelineSink.java:111)
 ... 17 common frames omitted Wrapped by:
 org.elasticsearch.ElasticsearchException: java.lang.NoClassDefFoundError:
 org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1
 at
 org.elasticsearch.transport.netty3.Netty3Transport.exceptionCaught(Netty3Transport.java:325)
 at
 org.elasticsearch.transport.netty3.Netty3MessageChannelHandler.exceptionCaught(Netty3MessageChannelHandler.java:83)
 at
 org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
 at
 org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
 at
 org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
 ... 23 frames truncated


 Jayant

>>>


Re: How can I find out which key group belongs to which subtask

2020-01-10 Thread Zhijiang
Only chained operators can avoid record serialization cost, but the chaining 
mode can not support keyed stream.
If you want to deploy downstream with upstream in the same task manager, it can 
avoid network shuffle cost which can still get performance benefits.
As I know @Till Rohrmann has implemented some enhancements in scheduler layer 
to support such requirement in release-1.10. You can have a try when the rc 
candidate is ready.

Best,
Zhijiang


--
From:杨东晓 
Send Time:2020 Jan. 10 (Fri.) 02:10
To:Congxian Qiu 
Cc:user 
Subject:Re: How can I find out which key group belongs to which subtask

Thanks Congxian!
 My purpose is not only make data goes into one same subtask but the specific 
subtask which belongs to same taskmanager with upstream record. The key idea is 
to avoid shuffling  between taskmanagers.
I think the KeyGroupRangeAssignment.java  explained a lot about how to get 
keygroup and subtask context that can make that happen.
Do you know if there are still  serialization happening while data transferred 
between operator in same taskmanager?
Thanks.
Congxian Qiu  于2020年1月9日周四 上午1:55写道:

Hi

If you just want to make sure some key goes into the same subtask, does custom 
key selector[1] help?

For the keygroup and subtask information, you can ref to 
KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you can 
ref to doc[3]

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions
[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism

Best,
Congxian

杨东晓  于2020年1月9日周四 上午7:47写道:
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is 
there any possible I can find out one key belongs to which key-group and 
essentially find out one key-group belongs to which subtask.
The motivation I want to know that is we want to  force the data records from 
upstream still goes to same taskmanager downstream subtask .Which means even if 
we use a keyedstream function we still want no cross jvm communication happened 
during run time.
And if we can achieve that , can we also avoid the expensive cost for record 
serialization because data is only transferred in same taskmanager jvm instance?

Thanks.