WebUI show custom config

2019-06-21 Thread Dominik Wosiński
Hey,
I am building jobs that use Typesafe Config under the hood. The configs
tend to grow big. I was wondering whether there is a possibility to use
WebUI to show the config that the job was run with, currently the only idea
is to log the config and check it inside the logs, but with dozens of jobs
it is getting troublesome. Is there a better way to access the job custom
configuration??

Thanks in advance,
Best,
Dom.


Re: Use Partitioner to forward messages to subtask by index

2019-06-21 Thread Ken Krugler
Hi Joshua,

It is possible, but fragile, as it depends on the internal code that Flink uses 
to calculate a hash for a key, and the max operator parallelism, etc.

See makeKeyForOperatorIndex 
,
 which will generate a String that can be used for partitioning to a specific 
subtask (operator index)

— Ken

> On Jun 21, 2019, at 10:15 AM, Joshua Griffith  wrote:
> 
> Is it possible to use a custom Partitioner to forward messages to a 
> downstream substask by the subtask's index? I realize that it would not be 
> safe to rely upon this across job restarts but this does not effect my 
> particular application. I attempted to return a partition number identical to 
> the subtask number but this did not work. For example, assigning a message to 
> partition 1 would send it to subtask 0.
> 
> Thanks,
> Josh

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Use Partitioner to forward messages to subtask by index

2019-06-21 Thread Joshua Griffith
Is it possible to use a custom Partitioner to forward messages to a
downstream substask by the subtask's index? I realize that it would not be
safe to rely upon this across job restarts but this does not effect my
particular application. I attempted to return a partition number identical
to the subtask number but this did not work. For example, assigning a
message to partition 1 would send it to subtask 0.

Thanks,
Josh


Re: [ANNOUNCEMENT] June 2019 Bay Area Apache Flink Meetup

2019-06-21 Thread Xuefu Zhang
Hi all,

As the event is around the corner. If you haven't responded, please RSVP at
meetup.com. Otherwise, I will see you next Wednesday, June 26.

Regards,
Xuefu

On Mon, Jun 10, 2019 at 7:50 PM Xuefu Zhang  wrote:

> Hi all,
>
> As promised, we planned to have quarterly Flink meetup and now it's about
> the time. Thus, I'm happy to announce that the next Bay Area Apache Flink
> Meetup [1] is scheduled on June 26 at Zendesk, 1019 Market Street in San
> Francisco.
>
> Schedule:
>
> 6:30 - 7:00 PM Networking and Refreshments
> 7:00 - 8:30 PM Short talks
>
> Many thanks go to Zendesk for their sponsorship. At the same time, we are
> open to 2 or 3 short talks. If interested, please let me know.
>
> Thanks,
>
> Xuefu
>
> [1] https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/262216929
>>
>


Re: Flink任务资源动态规划

2019-06-21 Thread 田志声
Hi,具体启动几个 TM、这个据我了解在 yarn 上是不支持自动扩缩 TM 个数的,然后启动多少个  Slot,这个主要取决于你的并行度,如果你的 Job 
需要这么多并行度来处理数据的话,那么就需要你有这么多 Slot 去支持你开启这么多并行度,因为一个 Job 的并行度最终还是取决于你的 Slot 
是否足够,如果 Slot 不够的话,则会报申请 Slot 失败,还有就是你的 Job 并行度怎么设置,一般上线你自己写的 Flink Job 
之前最好自己去做一个压测,看看自己 Job 的设置的并行度是否合理,能否及时的处理数据,不会导致数据堆积。在压测时观察 Job 
的每个算子处理情况,可能对于有些算子做的任务有点重会导致该算子的处理速度不及时,这种情况就需要调大些并行度,还有就是假如你去消费 MQ Topic 
的数据,MQ 有多个分区,如果你数据量小的话,可能一两个并行度也能处理得过来,也就没必要开这么多并行度,如果消费不过来,那么就得慢慢加大到和 MQ Topic 
相同分区个数的并行度,再去观察你的 Job 是否能够及时消费(MQ 数据不堆积)。

具体的量化公式,貌似没有,反正整个调优过程,该如何设置并行度、slot、分配多少个 
TM,还得根据你实际业务场景的数据量和处理逻辑的复杂性来衡量。这是本人的观点,如果有更好的建议欢迎分享。




from zhisheng

> 在 2019年6月21日,下午9:42,15904502...@163.com 写道:
> 
> 
> 大家好:
>我这边Flink运行是on 
> yarn,任务启动时具体启用几个TM、几个Slot,还有并行度之类的资源配置,目前都是基于个人经验进行配置。我想问一下具体影响Flink任务资源配置的维度有哪些?还有这些维度能不能具体量化出来,经过一定的计算来动态规划资源,应该采用什么方案,从而在平台上减少研发的参与。
>谢谢大家
> 
> 
> 15904502...@163.com



Re: Unexpected behavior from interval join in Flink

2019-06-21 Thread Wouter Zorgdrager
Anyone some leads on this issue? Have been looking into the
IntervalJoinOperator code, but that didn't really help. My intuition is
that it is rejected because of lateness, however that still confuses me
since I'm sure that both datastreams have monotonic increasing timestamps.

Thx, Wouter

Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <
w.d.zorgdra...@tudelft.nl>:

> Hi all,
>
> I'm experiencing some unexpected behavior using an interval join in Flink.
> I'm dealing with two data sets, lets call them X and Y. They are finite
> (10k elements) but I interpret them as a DataStream. The data needs to be
> joined for enrichment purposes. I use event time and I know (because I
> generated the data myself) that the timestamp of an element Y is always
> between -60 minutes and +30 minutes of the element with the same key in set
> X. Both datasets are in-order (in terms of timestamps), equal in size,
> share a common key and parallelism is set to 1 throughout the whole program.
>
> The code to join looks something like this:
>
> xStream
>   .assignAscendingTimestamps(_.date.getTime)
>   .keyBy(_.commonKey)
>   .intervalJoin(
> yStream
>   .assignAscendingTimestamps(_.date.getTime)
>   .keyBy(_.commonKey))
>   .between(Time.minutes(-60), Time.minutes(30))
>   .process(new ProcessJoinFunction[X, Y, String] {
> override def processElement(
> left: X,
> right: Y,
> ctx: ProcessJoinFunction[X, Y, String]#Context,
> out: Collector[String]): Unit = {
>
>   out.collect(left + ":" + right)
> }
>
>
> However, about 30% percent of the data is not joined. Is there a proper
> way to debug this? For instance, in windows you can side-output late data.
> Is there a possibility to side-output unjoinable data?
>
> Thx a lot,
> Wouter
>
>
>


Re: Idle windows

2019-06-21 Thread Hequn Cheng
Hi Ustinov,

I guess you have mixed the concept between remainder and the parallelism,
i.e., data with remainder 0 don't mean they will be processed by the 0th
task after keyBy.
Flink will perform a Hash function on the key you have provided, and
partition the record based on the key group range.

KeyBy makes sure that the same key goes to the same place, if you want to
balance the workload, you need to have more different keys.

Best, Hequn


On Fri, Jun 21, 2019 at 6:23 PM Ustinov Anton  wrote:

> I have a simple job that reads JSON messages from Kafka topic and
> proccesses them like this:
>
> SingleOutputStreamOperator result = ds
> .filter(ev -> ev.has(cookieFieldName))
> .map(ev -> ev.get(cookieFieldName).asText())
> .keyBy(new CookieKeySelector(env.getParallelism()))
> .timeWindow(Time.seconds(period))
> .aggregate(new CookieAggregate())
> .timeWindowAll(Time.seconds(period))
> .reduce((v1, v2) -> v1 + v2);
>
> CookieKeySelector counts MD5 hash from cookie value and calculate
> remainder from division on job parallelism. CookieAggreage counts unique
> cookie values in window. I see in Flink Dashboard that only half of
> windows are getting messages to process. Number of working windows depends
> on job parallelism. Why only part of windows compute useful aggregates?
> I’ve tried to use random numbers as a key and still get same result.
>
> Additional information: Flink 1.8.0, runs on a single node with 56 CPUs,
> 256G RAM, 10GB/s network.
>
>
> Anton Ustinov
> ustinov@gmail.com
>
>


[FLINK-10868] the job cannot be exited immediately if job manager is timed out

2019-06-21 Thread Young
Hi ZhenQiu & trohrmann:


Currently I backport the  FLINK-10868 to flink-1.5. Most of my jobs(all batch 
jobs) can be exited immediately after applying for the failed container to the 
upper limit, but there are still some jobs cannot be exited immediately. 
Through the log, it is observed that these jobs have the job manager timed out 
first  for unknown reasons, and the execution of code segment 1 is after the 
job manager timed out but before the job manager is reconnected, so it is 
suspected that the job manager is out of synchronization and the code segment 2 
is in the code segment notifyAllocationFailure function is not executed .


I'm wandering if you have encountered similar problems? Is there a solution? In 
order to solve the problem that job cannot be immediately exited, it is 
currently considered that if (jobManagerRegistration==null) then executes the 
onFatalError() method to immediately exit the process, it is temporarily 
unclear whether this violent practice will have any side effects.


Thanks,
Young


code segment 1  in ResourceManager.java:
private void cancelAllPendingSlotRequests(Exception cause) {
slotManager.cancelAllPendingSlotRequests(cause);
}


code segment 2  in ResourceManager.java:
public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
Exception cause) {
validateRunsInMainThread();
log.info("Slot request with allocation id {} for job {} failed.", allocationId, 
jobId, cause);

   JobManagerRegistration jobManagerRegistration = 
jobManagerRegistrations.get(jobId);
if (jobManagerRegistration != null) {
jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
 cause);
}
}







Idle windows

2019-06-21 Thread Ustinov Anton
I have a simple job that reads JSON messages from Kafka topic and proccesses 
them like this:

SingleOutputStreamOperator result = ds
.filter(ev -> ev.has(cookieFieldName))
.map(ev -> ev.get(cookieFieldName).asText())
.keyBy(new CookieKeySelector(env.getParallelism()))
.timeWindow(Time.seconds(period))
.aggregate(new CookieAggregate())
.timeWindowAll(Time.seconds(period))
.reduce((v1, v2) -> v1 + v2);

CookieKeySelector counts MD5 hash from cookie value and calculate remainder 
from division on job parallelism. CookieAggreage counts unique cookie values in 
window. I see in Flink Dashboard that only half of windows are getting messages 
to process. Number of working windows depends on job parallelism. Why only part 
of windows compute useful aggregates? I’ve tried to use random numbers as a key 
and still get same result.

Additional information: Flink 1.8.0, runs on a single node with 56 CPUs, 256G 
RAM, 10GB/s network.


Anton Ustinov
ustinov@gmail.com 



Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-21 Thread zhijiang
Thanks for the reminding @Chesnay Schepler .

I just looked throught the related logs. Actually all the five "Source: 
ServiceLog" tasks are not in terminal state on JM view, the relevant processes 
are as follows:

1. The checkpoint in task causes OOM issue which would call 
`Task#failExternally` as a result, we could see the log "Attempting to fail 
task externally" in tm.
2. The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
3. When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 

For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.

I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月21日(星期五) 16:34
To:zhijiang ; Joshua Fan 
Cc:user ; Till Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

The logs are attached to the initial mail.

 Echoing my thoughts from earlier: from the logs it looks as if the TM never 
even submits the terminal state RPC calls for several tasks to the JM.

 On 21/06/2019 10:30, zhijiang wrote:
Hi Joshua,

If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in 
CANCELED state on TM side, but in CANCELING state on JM side, then it might 
indicates the terminal state RPC was not received by JM. I am not sure whether 
the OOM would cause this issue happen resulting in unexpected behavior.

In addition, you mentioned these tasks are still active after OOM and was 
called to cancel, so I am not sure what is the specific periods for your 
attached TM stack. I think it might provide help if you could provide 
corresponding TM log and JM log. 
From TM log it is easy to check the task final state. 

Best,
Zhijiang
--
From:Joshua Fan 
Send Time:2019年6月20日(星期四) 11:55
To:zhijiang 
Cc:user ; Till Rohrmann ; Chesnay 
Schepler 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

zhijiang 

I did not capture the job ui, the topology is in FAILING state, but the 
persistentbolt subtasks as can be seen in the picture attached in first mail 
was all canceled, and the parsebolt subtasks as described before had one 
subtask FAILED, other subtasks CANCELED, but the source subtasks had one 
subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 
2/5,subtask 3/5,subtask 5/5) CANCELING,  not in a terminal state.

The subtask status described above is in jm view, but in tm view, all of the 
source subtask was in FAILED, do not know why jm was not notify about this.

As all of the failed status was triggered by a oom by the subtask can not 
create native thread when checkpointing, I also dumped the stack of the jvm, it 
shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are 
still active after it throwed a oom and was called to cancel . I attached the 
jstack file in this email.

Yours sincerely
Joshua  
On Wed, Jun 19, 2019 at 4:40 PM zhijiang  wrote:
As long as one task is in canceling state, then the job status might be still 
in canceling state.

@Joshua Do you confirm all of the tasks in topology were already in terminal 
state such as failed or canceled?

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月19日(星期三) 16:32
To:Joshua Fan ; user ; Till 
Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

 @Till have you see something like this before? Despite all source tasks 
 reaching a terminal state on a TM (FAILED) it does not send updates to 
 the JM for all of them, but only a single one.

 On 18/06/2019 12:14, Joshua Fan wrote:
 > Hi All,
 > There is a topology of 3 operator, such as, source, parser, and 
 > persist. Occasionally, 5 subtasks of the source encounters 

Re: [External] checkpoint metadata not in configured directory state.checkpoints.dir

2019-06-21 Thread Congxian Qiu
Hi Vishal

Maybe the slide[1] (page 40) can be helpful
[1] https://files.alicdn.com/tpsservice/c421720fcb1c51026257cd770923844a.pdf
Best,
Congxian


Vishal Sharma  于2019年6月20日周四 下午5:42写道:

> Hi Congxian,
>
> I am not sure how can I track the checkpoint path. Can you suggestion
> regarding this ?
>
> Thanks,
> Vishal Sharma
>
> On Thu, Jun 20, 2019 at 11:17 AM Congxian Qiu 
> wrote:
>
>> Hi, Vishal
>> If you want to restart from the last competed external checkpoint of the
>> previous stoped job, you need to track the checkpoint path and restart from
>> it.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>>
>> Best,
>> Congxian
>>
>>
>> Vishal Sharma  于2019年6月19日周三 下午11:38写道:
>>
>>> Hi Chesnay,
>>>
>>> Can you suggest, How should I go about automating job restart from last
>>> completed externalised checkpoint in case of failure ? I am not sure about
>>> the path for the latest completed checkpoint.
>>>
>>> Thanks,
>>> Vishal Sharma
>>>
>>> On Wed, Jun 19, 2019 at 11:11 PM Chesnay Schepler 
>>> wrote:
>>>
 The _metadata is always stored in the same directory as the checkpoint
 data.

 As outlined here
 
 "state.checkpoints.dir" serves as a cluster-wide configuration that _can_
 be overwritten with a job-specific setting when creating the state-backend.

 If you want the state-backend to use the configured directory you must
 configure the state-backend in the configuration as well, as outlined
 here
 
 .

 On 19/06/2019 16:26, Vishal Sharma wrote:

 Hi Folks,

 I am using flink 1.8 with externalised checkpointing enabled and saving
 the checkpoints to aws S3.

 My configuration is as follows :

 flink-conf.yaml :
 state.checkpoints.dir: s3a://test-bucket/checkpoint-metadata

 In application code :
 env.setStateBackend(new
 RocksDBStateBackend("s3a://test-bucket/checkpoints", true))

 As per my understanding, the externalized checkpoint’s meta data is
 determined from the configuration key "state.checkpoints.dir" and
 checkpoint data is stored in state backend path.

 However, In my case, I don't see anything in the metadata directory.
 The _metadata file is present inside each of the checkpoint directory 
 (chk-6043
 ...).

 Is this the expected behavior ? If yes, what is the use of
 "state.checkpoints.dir" configuration ?

 My goal is to establish a process to automatically restart the job from
 last completed externalised checkpoint in case of failure. For this
 to happen, I need to able to figure out path for the metadata of latest
 checkpoint.

 Thanks,
 Vishal Sharma

 *Grab is hiring. Learn more at https://grab.careers
 *

 By communicating with Grab Inc and/or its subsidiaries, associate
 companies and jointly controlled entities (“Grab Group”), you are deemed to
 have consented to the processing of your personal data as set out in the
 Privacy Notice which can be viewed at https://grab.com/privacy/

 This email contains confidential information and is only for the
 intended recipient(s). If you are not the intended recipient(s), please do
 not disseminate, distribute or copy this email Please notify Grab Group
 immediately if you have received this by mistake and delete this email from
 your system. Email transmission cannot be guaranteed to be secure or
 error-free as any information therein could be intercepted, corrupted,
 lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
 not accept liability for any errors or omissions in the contents of this
 email arises as a result of email transmission. All intellectual property
 rights in this email and attachments therein shall remain vested in Grab
 Group, unless otherwise provided by law.



>>> *Grab is hiring. Learn more at https://grab.careers
>>> *
>>>
>>> By communicating with Grab Inc and/or its subsidiaries, associate
>>> companies and jointly controlled entities (“Grab Group”), you are deemed to
>>> have consented to the processing of your personal data as set out in the
>>> Privacy Notice which can be viewed at https://grab.com/privacy/
>>>
>>> This email contains confidential information and is only for the
>>> intended recipient(s). If you are not the intended recipient(s), please do
>>> not disseminate, distribute or copy this email Please notify Grab Group
>>> immediately if you have received this by mistake and delete this email from
>>> your system. Email transmission cannot be 

Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-21 Thread Chesnay Schepler

The logs are attached to the initial mail.

Echoing my thoughts from earlier: from the logs it looks as if the TM 
never even submits the terminal state RPC calls for several tasks to the JM.


On 21/06/2019 10:30, zhijiang wrote:

Hi Joshua,

If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were 
really in CANCELED state on TM side, but in CANCELING state on JM 
side, then it might indicates the terminal state RPC was not received 
by JM. I am not sure whether the OOM would cause this issue happen 
resulting in unexpected behavior.


In addition, you mentioned these tasks are still active after OOM and 
was called to cancel, so I am not sure what is the specific periods 
for your attached TM stack. I think it might provide help if you could 
provide corresponding TM log and JM log.

From TM log it is easy to check the task final state.

Best,
Zhijiang

--
From:Joshua Fan 
Send Time:2019年6月20日(星期四) 11:55
To:zhijiang 
Cc:user ; Till Rohrmann
; Chesnay Schepler 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

zhijiang

I did not capture the job ui, the topology is in FAILING state,
but the persistentbolt subtasks as can be seen in the picture
attached in first mail was all canceled, and the parsebolt
subtasks as described before had one subtask FAILED, other
subtasks CANCELED, but the source subtasks had one subtask(subtask
4/5) CANCELED, and other subtasks(subtask 1/5,subtask 2/5,subtask
3/5,subtask 5/5) CANCELING,  not in a terminal state.

The subtask status described above is in jm view, but in tm view,
all of the source subtask was in FAILED, do not know why jm was
not notify about this.

As all of the failed status was triggered by a oom by the subtask
can not create native thread when checkpointing, I also dumped the
stack of the jvm, it shows the four subtasks(subtask 1/5,subtask
2/5,subtask 3/5,subtask 5/5) are still active after it throwed a
oom and was called to cancel . I attached the jstack file in this
email.

Yours sincerely
Joshua

On Wed, Jun 19, 2019 at 4:40 PM zhijiang
mailto:wangzhijiang...@aliyun.com>>
wrote:
As long as one task is in canceling state, then the job status
might be still in canceling state.

@Joshua Do you confirm all of the tasks in topology were already
in terminal state such as failed or canceled?

Best,
Zhijiang
--
From:Chesnay Schepler mailto:ches...@apache.org>>
Send Time:2019年6月19日(星期三) 16:32
To:Joshua Fan mailto:joshuafat...@gmail.com>>; user mailto:user@flink.apache.org>>; Till Rohrmann
mailto:trohrm...@apache.org>>
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

@Till have you see something like this before? Despite all source tasks

reaching a terminal state on a TM (FAILED) it does not send updates to

the JM for all of them, but only a single one.

On 18/06/2019 12:14, Joshua Fan wrote:
> Hi All,
> There is a topology of 3 operator, such as, source, parser, and
> persist. Occasionally, 5 subtasks of the source encounters exception

> and turns to failed, at the same time, one subtask of the parser runs

> into exception and turns to failed too. The jobmaster gets a message

> of the parser's failed. The jobmaster then try to cancel all the
> subtask, most of the subtasks of the three operator turns to canceled

> except the 5 subtasks of the source, because the state of the 5 ones

> is already FAILED before jobmaster try to cancel it. Then the
> jobmaster can not reach a final state but keeps in  Failing state
> meanwhile the subtask of the source kees in canceling state.
>
> The job run on a flink 1.7 cluster on yarn, and there is only one tm

> with 10 slots.
>
> The attached files contains a jm log , tm log and the ui picture.
>
> The exception timestamp is about 2019-06-16 13:42:28.
>
> Yours
> Joshua






Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-21 Thread zhijiang
Hi Joshua,

If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in 
CANCELED state on TM side, but in CANCELING state on JM side, then it might 
indicates the terminal state RPC was not received by JM. I am not sure whether 
the OOM would cause this issue happen resulting in unexpected behavior.

In addition, you mentioned these tasks are still active after OOM and was 
called to cancel, so I am not sure what is the specific periods for your 
attached TM stack. I think it might provide help if you could provide 
corresponding TM log and JM log. 
From TM log it is easy to check the task final state. 

Best,
Zhijiang
--
From:Joshua Fan 
Send Time:2019年6月20日(星期四) 11:55
To:zhijiang 
Cc:user ; Till Rohrmann ; Chesnay 
Schepler 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

zhijiang

I did not capture the job ui, the topology is in FAILING state, but the 
persistentbolt subtasks as can be seen in the picture attached in first mail 
was all canceled, and the parsebolt subtasks as described before had one 
subtask FAILED, other subtasks CANCELED, but the source subtasks had one 
subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 
2/5,subtask 3/5,subtask 5/5) CANCELING,  not in a terminal state.

The subtask status described above is in jm view, but in tm view, all of the 
source subtask was in FAILED, do not know why jm was not notify about this.

As all of the failed status was triggered by a oom by the subtask can not 
create native thread when checkpointing, I also dumped the stack of the jvm, it 
shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are 
still active after it throwed a oom and was called to cancel . I attached the 
jstack file in this email.

Yours sincerely
Joshua
On Wed, Jun 19, 2019 at 4:40 PM zhijiang  wrote:
As long as one task is in canceling state, then the job status might be still 
in canceling state.

@Joshua Do you confirm all of the tasks in topology were already in terminal 
state such as failed or canceled?

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月19日(星期三) 16:32
To:Joshua Fan ; user ; Till 
Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

@Till have you see something like this before? Despite all source tasks 
reaching a terminal state on a TM (FAILED) it does not send updates to 
the JM for all of them, but only a single one.

On 18/06/2019 12:14, Joshua Fan wrote:
> Hi All,
> There is a topology of 3 operator, such as, source, parser, and 
> persist. Occasionally, 5 subtasks of the source encounters exception 
> and turns to failed, at the same time, one subtask of the parser runs 
> into exception and turns to failed too. The jobmaster gets a message 
> of the parser's failed. The jobmaster then try to cancel all the 
> subtask, most of the subtasks of the three operator turns to canceled 
> except the 5 subtasks of the source, because the state of the 5 ones 
> is already FAILED before jobmaster try to cancel it. Then the 
> jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state.
>
> The job run on a flink 1.7 cluster on yarn, and there is only one tm 
> with 10 slots.
>
> The attached files contains a jm log , tm log and the ui picture.
>
> The exception timestamp is about 2019-06-16 13:42:28.
>
> Yours
> Joshua




Re: dynamic metric

2019-06-21 Thread Till Rohrmann
Yes, this could be an option. Or you simply have a flag for it.

On Fri, Jun 21, 2019, 01:47 David Morin  wrote:

> Thanks Till.
> Ok, I've got it.
> So, to prevent from register the metric twice I have to keep an index
> (Hasmap for example) to check if the metric already exists ?
>
> Le ven. 21 juin 2019 à 01:27, Till Rohrmann  a
> écrit :
>
>> Hi David,
>>
>> I think it is not strictly required that you register the metric in the
>> open method. It is just convenient because otherwise you have to make sure
>> that you register the metric only once (e.g. when doing it in the map
>> function).
>>
>> What you need in order to register a metric is the runtime context which
>> you get if you implement a RichFunction:
>>
>> getRuntimeContext().getMetricGroup().gauge("MyGauge", new
>> Gauge() {
>> @Override
>> public Integer getValue() {
>>   return valueToExpose;
>> }
>>   });
>>
>> Cheers,
>> Till
>>
>> On Fri, Jun 21, 2019 at 12:36 AM David Morin 
>> wrote:
>>
>>> Hi,
>>>
>>> I want to create one metric related to the number of errors but in fact
>>> I would like to add some contextual labels ?
>>> What is the best way to do that ? gauge ?
>>> How to create this kind of metric dynamically during the run of the task
>>> (because open method is not possible because too early) ?
>>> Thanks in advance
>>>
>>> David
>>>
>>


Re: CoFlatMapFunction vs BroadcastProcessFunction

2019-06-21 Thread Dawid Wysakowicz
Hi Andy,

I think the second link you posted describes the differences between the
CoFlatMap vs BroadcastStream approach very well. I will try to summarize
them again. The are two main differences:

1. With CoFlatMap either both inputs can be keyed or none. You cannot
have one input keyed and the other one non keyed. This means also the
state will be stored in operator state.

2. The second difference is how the state is handled. The broadcast
stream implements the broadcast state handling for you. It will ensure
that upon restore all operators will have the same broadcast state, even
if you rescale. I think you can say in this approach not only the stream
is broadcasted, but also the state is broadcasted (on the broadcast side).

As for the example you linked, I don't know enough about it to tell
anything about what it does and why it does that way.

Best,

Dawid

On 20/06/2019 12:39, Andy Hoang wrote:

> Hi guys,
>
> I read about
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-Rule-Evaluation-in-Flink-td21125.html#a21241
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/BroadcastStream-vs-Broadcasted-DataStream-td23712.html
>
> I tried to use those 2 classes for my problem: One stream as config
> stream to change behavior on another event
> stream 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Weird-behavior-with-CoFlatMapFunction-td28140.html
>  (solved,
> thanks to Fabian)
>
> 2 of that implementation basically the same, each of classes we have
> to implement to method:
> flatmap1 vs processElement: process the “event” stream 
> flatmap2 vs processBroadcastElement: process the “config” stream
>
> While those implementation is quite similar, I’m not sure which one I
> should pick.
> My gut make me feel like I haven’t harness all the angles
> of BroadcastProcessFunction yet. I’m curious in which situation we
> should use this classes, because even with the example in Doc
> page: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/broadcast_state.html#important-considerations
>  we
> can still use CoFlatMapFunction to do it.
>
> There come another sample I got from google which is using
> both: 
> https://gist.github.com/syhily/932e0d1e0f12b3e951236d6c36e5a7ed#file-twostreamingjoining-scala-L178
>  I
> haven’t got the idea what would it want to do:
>
> rule.broadcast.connect…
> and connect again
> .connect(broadcastStream)
>
> Maybe this is the missing piece that I haven’t understand
> about BroadcastProcessFunction
>
> I hope you guys can point me some direction on how/when I would choose
> which classes.
>
> Thanks,
>
> Andy
>



signature.asc
Description: OpenPGP digital signature