[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2019-08-27 Thread Xiaogang Shi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16916617#comment-16916617
 ] 

Xiaogang Shi commented on FLINK-13848:
--

[~SleePy] Sorry that I misunderstand your second question.

You are right that we can use delayed scheduling of {{MainThreadExecutor}} to 
achieve periodically scheduling. But it requires two messages sent to the actor 
for each triggering. It may affect the throughput when the rpc endpoint is 
flooded.

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2019-08-27 Thread Xiaogang Shi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16916607#comment-16916607
 ] 

Xiaogang Shi commented on FLINK-13848:
--

1. Yes. If the task message is already sent to the actor's mailbox, it's very 
difficult to prohibit it from being executed by the actor. Using a boolean flag 
which can be accessed by returned ScheduledFuture may be a solution, but that 
makes the cancellation very complicated.
2. Most rpc endpoints are already equipped with {{ScheduledExecutorService}} 
which can be used to achieve periodical triggering. If you take a look at the 
implementation, {{ScheduledExecutorService}} implements periodically scheduling 
by scheduling next triggering after the completion of current triggering.

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2019-08-26 Thread Xiaogang Shi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16915689#comment-16915689
 ] 

Xiaogang Shi commented on FLINK-13848:
--

Hi [~SleePy], what's your detailed plan on implementing these methods?

Once we attempted to implement these methods in our private branches. I am very 
happy to share some experience here.

Our implementation is similar to that in java's {{ScheduledThreadPoolExecutor}} 
except that new tasks are submitted via Akka's dispatcher.  One problem we 
encountered is that we can only cancel the scheduling of task messages in Akka, 
but fail to cancel those task messages already sent. Finally, some tasks are 
still be executed after the {{ScheduledFuture}} is cancelled, leading to some 
weird concurrent behaviors. 

Given that few benefits are brought by Akka's dispatcher, we decided to use a 
non-main-threaded {{ScheduledExecutorService}} together with {{runAsync}} to 
achieve periodic triggering in main thread. The new implementation suffers from 
the same problem, but is much simpler.

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13732) Enhance JobManagerMetricGroup with FLIP-6 architecture

2019-08-15 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16907856#comment-16907856
 ] 

Xiaogang Shi commented on FLINK-13732:
--

[~SleePy] Thanks for bringing up this issue. 

We are also suffering from confusing "job manager metrics". It will be nice if 
we can sperate legacy {{JobManagerMetricGroup}} into {{DispatcherMetricGroup}}, 
{{ResourceManagerMetricGroup}}, and {{JobManagerMetricGroup}}, and distinguish 
them with cluster ids.

But an interesting question here is the collection of process metrics (e.g., 
cpu, memory, i/o, and threads). Currently, it's not a problem as Flink now does 
not collect any process metrics. But from our experience, these process metrics 
are very helpful in monitoring and troubleshooting. 

Definitely, it's another question whether we should collect process metrics. 
But in case we do, it will be a question in which metric group we collect 
process metrics of job managers. 

> Enhance JobManagerMetricGroup with FLIP-6 architecture
> --
>
> Key: FLINK-13732
> URL: https://issues.apache.org/jira/browse/FLINK-13732
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> There is a requirement from user mailing list [1]. I think it's reasonable 
> enough to support.
> The scenario is that when deploying a Flink cluster on Yarn, there might be 
> several {{JM(RM)}} s running on the same host. IMO that's quite a general 
> scenario. However we can't distinguish the metrics from different 
> {{JobManagerMetricGroup}}, because there is only one variable "hostname" we 
> can use.
> I think there are some problems of current implementation of 
> {{JobManagerMetricGroup}}. It's still non-FLIP-6 style. We should split the 
> metric group into {{RM}} and {{Dispatcher}} to match the FLIP-6 architecture. 
> And there should be an identification variable supported, just like {{tm_id}}.
> CC [~StephanEwen], [~till.rohrmann], [~Zentol]
> 1. 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-scope-for-YARN-single-job-td29389.html]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (FLINK-12887) Schedule UnfencedMessage would lost envelope info

2019-07-22 Thread Xiaogang Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi reassigned FLINK-12887:


Assignee: TisonKun  (was: Xiaogang Shi)

> Schedule UnfencedMessage would lost envelope info 
> --
>
> Key: FLINK-12887
> URL: https://issues.apache.org/jira/browse/FLINK-12887
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>
> We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for 
> {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and 
> {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}.
> Let's think about a case when we want to schedule a unfenced runnable or any 
> other unfenced message(currently, we don't have such code path but it's 
> semantically valid.). 
> 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay
> 2. It extracted the runnable from unfenced message and call 
> {{super.handleRpcMessage}}.
> 3. {{AkkaRpcActor}} enveloped the message and schedule it by 
> {{AkkaRpcActor#L410}}.
> However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. 
> Thus the unfenced message now become a fenced message.
> We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule 
> unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., 
> dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a 
> wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a 
> message is shown on what params ScheduleExecutorService called with, at least 
> we cannot extract an unfenced message and envelop it into a fence message and 
> then schedule it, which goes into wrong semantic.
> cc [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (FLINK-12887) Schedule UnfencedMessage would lost envelope info

2019-07-22 Thread Xiaogang Shi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi reassigned FLINK-12887:


Assignee: Xiaogang Shi

> Schedule UnfencedMessage would lost envelope info 
> --
>
> Key: FLINK-12887
> URL: https://issues.apache.org/jira/browse/FLINK-12887
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Assignee: Xiaogang Shi
>Priority: Major
>
> We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for 
> {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and 
> {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}.
> Let's think about a case when we want to schedule a unfenced runnable or any 
> other unfenced message(currently, we don't have such code path but it's 
> semantically valid.). 
> 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay
> 2. It extracted the runnable from unfenced message and call 
> {{super.handleRpcMessage}}.
> 3. {{AkkaRpcActor}} enveloped the message and schedule it by 
> {{AkkaRpcActor#L410}}.
> However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. 
> Thus the unfenced message now become a fenced message.
> We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule 
> unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., 
> dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a 
> wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a 
> message is shown on what params ScheduleExecutorService called with, at least 
> we cannot extract an unfenced message and envelop it into a fence message and 
> then schedule it, which goes into wrong semantic.
> cc [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-12887) Schedule UnfencedMessage would lost envelope info

2019-07-11 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883233#comment-16883233
 ] 

Xiaogang Shi commented on FLINK-12887:
--

As described in my first comment, we are using 
{{scheduleRunAsyncWithoutFencing}} to kill leaked containers. When Yarn RM 
restarts, it will take over containers from previous attempts, some of which 
may be in stuck. Those containers must be killed to release resources. In our 
private version, we schedule a delayed operation to check whether TMs in these 
containers register themselves in RM in time. If a container's TM does not 
register itself in time, the container will be killed. Because RM may have 
granted its leadership when it restarts, the check operation must be scheduled 
without fencing.

For the scenario described, it's true that it could not happen in current 
implementation because we now enforce the method {{setFencingToken}} to be 
called in the main thread. I'm sorry for my confusing description. My point 
here is that by simply scheduling message with Akka dispatcher, we can omit the 
inconvenient brought by unnecessary enveloping.

I think {{AkkaInvocationHandler}} knows the underlying {{AkkaRpcActor}} as it 
is referenced by {{rpcEndpoint}}. I guess the problem actually is due to the 
unknown {{ActorSystem}}. If so, we can add the actor's {{ActorSystem}} in the 
constructor of {{AkkaInvocationHandler}}, and use the {{ActorSystem}}'s 
dispatcher to schedule delayed {{RunAsync}} messages.  What do you think? 
[~till.rohrmann]

 

> Schedule UnfencedMessage would lost envelope info 
> --
>
> Key: FLINK-12887
> URL: https://issues.apache.org/jira/browse/FLINK-12887
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Priority: Major
>
> We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for 
> {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and 
> {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}.
> Let's think about a case when we want to schedule a unfenced runnable or any 
> other unfenced message(currently, we don't have such code path but it's 
> semantically valid.). 
> 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay
> 2. It extracted the runnable from unfenced message and call 
> {{super.handleRpcMessage}}.
> 3. {{AkkaRpcActor}} enveloped the message and schedule it by 
> {{AkkaRpcActor#L410}}.
> However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. 
> Thus the unfenced message now become a fenced message.
> We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule 
> unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., 
> dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a 
> wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a 
> message is shown on what params ScheduleExecutorService called with, at least 
> we cannot extract an unfenced message and envelop it into a fence message and 
> then schedule it, which goes into wrong semantic.
> cc [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-12887) Schedule UnfencedMessage would lost envelope info

2019-06-20 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16869118#comment-16869118
 ] 

Xiaogang Shi commented on FLINK-12887:
--

[~till.rohrmann] I am also very curious on the method we implement delayed 
{{runAsync}} operations. Now we first send the {{runAsync}} message into the 
actor and then schedule the operation with Akka dispatcher. There are two 
questions in the implementation:
1. It seems unnecessary to send the {{runAsync}} message to the actor at first. 
Can we simply schedule the message with Akka dispatcher?
2. The token is enveloped again by the actor. Rarely but possibily, the token 
at submit time is different from the one at envelope time:
   t1. A rpc endpoint submit a {{runAsync}} operation
   t2. The rpc endpoint loses its leadership
   t3. The rpc endpoint grants its leadership, creating a new fencing token 
   t4. The {{runAsync}} operation is executed by the actor. It's enveloped 
with the new fencing token, and is scheduled by the Akka dispatcher.
   In such cases, an operation in previous session will be executed. That may 
lead to unexpected results.
 


> Schedule UnfencedMessage would lost envelope info 
> --
>
> Key: FLINK-12887
> URL: https://issues.apache.org/jira/browse/FLINK-12887
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Priority: Major
>
> We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for 
> {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and 
> {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}.
> Let's think about a case when we want to schedule a unfenced runnable or any 
> other unfenced message(currently, we don't have such code path but it's 
> semantically valid.). 
> 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay
> 2. It extracted the runnable from unfenced message and call 
> {{super.handleRpcMessage}}.
> 3. {{AkkaRpcActor}} enveloped the message and schedule it by 
> {{AkkaRpcActor#L410}}.
> However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. 
> Thus the unfenced message now become a fenced message.
> We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule 
> unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., 
> dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a 
> wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a 
> message is shown on what params ScheduleExecutorService called with, at least 
> we cannot extract an unfenced message and envelop it into a fence message and 
> then schedule it, which goes into wrong semantic.
> cc [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12887) Schedule UnfencedMessage would lost envelope info

2019-06-20 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16869086#comment-16869086
 ] 

Xiaogang Shi commented on FLINK-12887:
--

Hi [~till.rohrmann], now we are using many unfenced asynchronous operations in 
Yarn RM to process notifications from Yarn. Otherwise, Yarn RM will miss some 
notifications when it has not granted the leadership.

Another case is the timers to release stuck containers. When a Yarn RM 
restarts, it will recover containers from previous attempts. Some containers 
may be in stuck and we should kill them to release resources. We now use timers 
to monitor these recovered containers and will kill those containers whose task 
managers cannot register in time. The timers must be unfenced because the Yarn 
RM may not grant the leadership when it recovers the containers.


> Schedule UnfencedMessage would lost envelope info 
> --
>
> Key: FLINK-12887
> URL: https://issues.apache.org/jira/browse/FLINK-12887
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Priority: Major
>
> We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for 
> {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and 
> {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}.
> Let's think about a case when we want to schedule a unfenced runnable or any 
> other unfenced message(currently, we don't have such code path but it's 
> semantically valid.). 
> 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay
> 2. It extracted the runnable from unfenced message and call 
> {{super.handleRpcMessage}}.
> 3. {{AkkaRpcActor}} enveloped the message and schedule it by 
> {{AkkaRpcActor#L410}}.
> However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. 
> Thus the unfenced message now become a fenced message.
> We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule 
> unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., 
> dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a 
> wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a 
> message is shown on what params ScheduleExecutorService called with, at least 
> we cannot extract an unfenced message and envelop it into a fence message and 
> then schedule it, which goes into wrong semantic.
> cc [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12865) State inconsistency between RM and TM on the slot status

2019-06-17 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16866148#comment-16866148
 ] 

Xiaogang Shi commented on FLINK-12865:
--

[~till.rohrmann]You are right that there is no problem with the postponed 
handling of slot requests. I revisited the code and found that we do use ask to 
send heartbeat requests, but the responses are not sent back to 
{{PromiseActorRef}}. Instead, they are sent back directly to RM with a separate 
RPC method. So the handling of the heartbeat reponses will not be postponed. 

After revisiting the code, it seems sending heartbeats in the main thread will 
fix the problem.

Thanks a lot for your explanation and sorry for my misleading information.

> State inconsistency between RM and TM on the slot status
> 
>
> Key: FLINK-12865
> URL: https://issues.apache.org/jira/browse/FLINK-12865
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> There may be state inconsistency between TM and RM due to race condition and 
> message loss:
>  # When TM sends heartbeat, it retrieve SlotReport in the main thread, but 
> sends the heartbeat in another thread. There may be cases that the slot on TM 
> is FREE initially and SlotReport read the FREE state, then RM requests slot 
> and mark the slot as allocated, and the SlotReport finally override the 
> allocated status at the RM side wrongly.
>  # When RM requests slot, TM received the requests but the acknowledge 
> message get lot. Then RM will think this slot is free. 
>  Both the problems may cause RM marks an ALLOCATED slot as FREE. This may 
> currently cause additional retries till the state is synchronized after the 
> next heartbeat, and for the inaccurate resource statistics for the 
> fine-grained resource management in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport

2019-06-16 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865269#comment-16865269
 ] 

Xiaogang Shi commented on FLINK-12863:
--

Btw, i want to note that the race condition may not necessarily be caused by 
{{HeartbeatManagerSenderImpl}} sending heartbeats in a seperate thread. It can 
solve the problem in JM, but not the one in RM.

Even when RM send heartbeat requests in the main thread, right after a slot 
request, the heartbeart responses may be handled first by RM. It's because RM 
uses ask to send both heartbeat and slot requests. Temporary {{PromiseActor}}s 
will be created to receive responses from TM. Since there is no guarantee on 
the execution order of actors, the {{PromiseActor}} which receives response 
first may be executed later.



> Race condition between slot offerings and AllocatedSlotReport
> -
>
> Key: FLINK-12863
> URL: https://issues.apache.org/jira/browse/FLINK-12863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by 
> the {{TaskExecutor}} to synchronize its internal view on slot allocations 
> with the view of the {{JobMaster}}. It seems that there is a race condition 
> between offering slots and receiving the report because the 
> {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a 
> separate thread. 
> Due to that it can happen that we generate an {{AllocatedSlotReport}} just 
> before getting new slots offered. Since the report is sent from a different 
> thread, it can then happen that the response to the slot offerings is sent 
> earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an 
> outdated slot report on the {{TaskExecutor}} causing active slots to be 
> released.
> In order to solve the problem I propose to add a fencing token to the 
> {{AllocatedSlotReport}} which is being updated whenever we offer new slots to 
> the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the 
> {{TaskExecutor}} we compare the current slot report fencing token with the 
> received one and only process the report if they are equal. Otherwise we wait 
> for the next heartbeat to send us an up to date {{AllocatedSlotReport}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport

2019-06-16 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865239#comment-16865239
 ] 

Xiaogang Shi commented on FLINK-12863:
--

I think a similar problem happens in the heartbeats between RM and TM. 

When a RM receives a slot request from JM, it will find an available slot, mark 
it as pending, and send a slot request to TM. In the cases where the slot 
request is following a heartbeat request, RM will receive the heartbeat 
response first and will remove the pending slot. RM may reuse the slot when it 
receives a new slot request from JM, leading to duplicated slot allocation. 

A solution proposed by [~yungao.gy] is using version numbers. Each slot is 
equipped with a version number, which is increased once a new pending request 
is generated. These version numbers then are attached to the heartbeats sent to 
TM. Once a heartbeat response is received, we don't need to remove those 
pending slot requests whose version numbers are greater than those of 
heartbeats.

I think the solution can also work here. What do you think? 
[~yungao.gy][~till.rohrmann]

> Race condition between slot offerings and AllocatedSlotReport
> -
>
> Key: FLINK-12863
> URL: https://issues.apache.org/jira/browse/FLINK-12863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by 
> the {{TaskExecutor}} to synchronize its internal view on slot allocations 
> with the view of the {{JobMaster}}. It seems that there is a race condition 
> between offering slots and receiving the report because the 
> {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a 
> separate thread. 
> Due to that it can happen that we generate an {{AllocatedSlotReport}} just 
> before getting new slots offered. Since the report is sent from a different 
> thread, it can then happen that the response to the slot offerings is sent 
> earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an 
> outdated slot report on the {{TaskExecutor}} causing active slots to be 
> released.
> In order to solve the problem I propose to add a fencing token to the 
> {{AllocatedSlotReport}} which is being updated whenever we offer new slots to 
> the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the 
> {{TaskExecutor}} we compare the current slot report fencing token with the 
> received one and only process the report if they are equal. Otherwise we wait 
> for the next heartbeat to send us an up to date {{AllocatedSlotReport}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12761) Fine grained resource management

2019-06-06 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16857448#comment-16857448
 ] 

Xiaogang Shi commented on FLINK-12761:
--

I think it's a good idea to enable fine-grained resource management in Flink. 

I'ver read the design document and found many details omitted in the document. 
As ResourceManager is a critical component in Flink and has many bugs right now 
(especially YarnResourceManager), it's appreciated that you can provide more 
details in your design document.

> Fine grained resource management
> 
>
> Key: FLINK-12761
> URL: https://issues.apache.org/jira/browse/FLINK-12761
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Tony Xintong Song
>Assignee: Tony Xintong Song
>Priority: Major
>  Labels: Umbrella
>
> This is an umbrella issue for enabling fine grained resource management in 
> Flink.
> Fine grained resource management is a big topic that requires long term 
> efforts. There are many issues to be addressed and designing decisions to be 
> made, some of which may not be resolved in short time. Here we propose our 
> design and implementation plan for the upcoming release 1.9, as well as our 
> thoughts and ideas on the long term road map on this topic.
> A practical short term target is to enable fine grained resource management 
> for batch sql jobs only in the upcoming Flink 1.9. This is necessary for 
> batch operators added from blink to achieve good performance.
> Please find detailed design and implementation plan in attached docs. Any 
> comment and feedback are welcomed and appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12285) Memory leak in SavepointITCase and SavepointMigrationTestBase

2019-04-22 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-12285:


 Summary: Memory leak in SavepointITCase and 
SavepointMigrationTestBase
 Key: FLINK-12285
 URL: https://issues.apache.org/jira/browse/FLINK-12285
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Xiaogang Shi
Assignee: Biao Liu


The tests in {{SavepointITCase}} and {{SavepointMigrationTestBase}} do not 
cancel running jobs before exit. It will cause exceptions in {{TaskExecutor}}s 
and unreleased memory segments. Succeeding tests may fail due to insufficient 
amount of memory.

The problem is caused by cancelling {{TaskExecutor}}s with running tasks. 
Another issue caused by the reason can be seen in FLINK-11343. Maybe we can 
find a more dedicated method to cancel those {{TaskExecutor}}s still having 
running tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12284) InputBufferPoolUsage is incorrect in credit-based network control flow

2019-04-22 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-12284:


 Summary: InputBufferPoolUsage is incorrect in credit-based network 
control flow
 Key: FLINK-12284
 URL: https://issues.apache.org/jira/browse/FLINK-12284
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.8.0, 1.7.2, 1.6.4, 1.6.3
Reporter: Xiaogang Shi


When using credit-based network control flow, exclusive buffers are directly 
assigned to {{RemoteInputChannel}} and are not counted in {{LocalBufferPool}}, 
leading to incorrect InputBufferPoolUsage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2019-04-12 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816118#comment-16816118
 ] 

Xiaogang Shi commented on FLINK-10052:
--

What's the status of the issue? [~Wosinsan]

{{SessionConnectionStateErrorPolicy}}  is introduced in Curator 3.0 while Flink 
is using Curator 2.12.

Since Curator 3.x has problems in the compatibility with Zookeeper 3.x, 
[Zookeeper Compatibility | [https://curator.apache.org/zk-compatibility.html]] 
, we should bump our Curator version to 4.x to use 
{{SessionConnectionStateErrorPolicy}}. 

What do you think? [~till.rohrmann]

> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Dominik Wosiński
>Priority: Major
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2018-11-28 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702604#comment-16702604
 ] 

Xiaogang Shi commented on FLINK-10333:
--

I think so. We can wrap all the access to ZK in the following way
{code:java}
 client.inTransaction()
 .check().forPath(election-node-path).and()
 .setData(...).forPath(...).and()
 .commit();
{code}
where {{election-node-path}} is the path of the election znode. The check on 
the path will ensure the JobManager is leader.

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.8.0
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even 
> if it is locked. This should not happen since it could leave another system 
> in an inconsistent state (imagine a changed {{JobGraph}} which restores from 
> an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
> which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
> * Getting rid of the {{SubmittedJobGraphListener}} would be helpful
> These problems made me think how reliable these components actually work. 
> Since these components are very important, I propose to refactor them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2018-11-28 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701821#comment-16701821
 ] 

Xiaogang Shi commented on FLINK-10333:
--

hi [~StephanEwen], I think the key point to achieve atomicity is the usage of 
Zookeeper transactions, ensuring that every modification to ZooKeeper only 
takes effect when the corresponding election node exists. As far as i know, 
ZooKeeper transactions only support the checks on the existence of given paths, 
and do not support the checks on the payload. Hence i think adding the leader 
ID as payload to the ephemeral node does not help in achieving atomicity. 

What do you think?

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.8.0
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even 
> if it is locked. This should not happen since it could leave another system 
> in an inconsistent state (imagine a changed {{JobGraph}} which restores from 
> an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
> which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
> * Getting rid of the {{SubmittedJobGraphListener}} would be helpful
> These problems made me think how reliable these components actually work. 
> Since these components are very important, I propose to refactor them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2018-11-27 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701495#comment-16701495
 ] 

Xiaogang Shi commented on FLINK-10333:
--

Hi [~till.rohrmann] Our cluster is suffering from unstable Zookeeper 
connections and I think this effort will help deal with some problems.

But we are still suffering from some problems in leader elections. The main 
cause is due the lack of atomicity. For example, JobMaster will write its 
address on another znode when it becomes the leader. But its leadership may 
already be lost when it is going to write its address (e.g., due to a long-time 
full GC). To alleviate the problem, many double checks are used in the code. 
Similar problems are also observed in the access to checkpoints. When an old 
job master loses its leadership, it may still have access to the checkpoints in 
Zookeeper and may modify them. Various methods (including locks to disallow 
deletion and rescanning zookeeper on restoring) are deployed to deal with these 
exceptions, but it does not seem to be a perfect solution.

After diving deep into the implementation of leader election in Zookeeper 
Recipes, i have some ideas to improve our implementation. The basic idea is 
that we should guarantee that only the elected leader has the access to 
Zookeeper. In Zookeeper Recipes, each leader contender will create an election 
znode which is SEQUENTIAL and EPHERMAL under a certain path. The contender with 
the smallest sequence number will be elected as the leader. When the elected 
leader fails, its election znode will disappear and the contender whose session 
number is smallest among the remaining contenders will be elected as the new 
leader. So once a contender grants the leadership,  its election znode must 
exist in Zookeeper. Hence, we can record the election node of each contender. 
Once each contender wants to modify something in Zookeeper, it must put these 
modification, together with the check on the existence of its election node, in 
a Zookeeper transaction. If the contender has already lost its leadership, the 
transaction will fail due to the unsatisfied check. That way, we can ensure 
only the elected leader has access to the states in Zookeeper.

Currently, Zookeeper Recipes does not expose any interface to access the path 
of election nodes. Maybe we need to reimplement the leader election with native 
Zookeeper interfaces in Flink without the usage of Zookeeper Recipes.

What do you think of the idea?

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.8.0
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even 
> if it is locked. This should not happen since it could leave another system 
> in an inconsistent state (imagine a changed {{JobGraph}} which restores from 
> an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
> which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
> * Getting rid of the {{SubmittedJobGraphListener}} would be helpful
> These problems made me think how reliable these components actually work. 
> Since these components are very important, I propose to refactor them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10714) java.lang.IndexOutOfBoundsException when creating a heap backend snapshot

2018-10-29 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668009#comment-16668009
 ] 

Xiaogang Shi commented on FLINK-10714:
--

[~cmick] I came across a similar problem before. It seems that kryo cannot 
properly serialize some collection types. Finally i got rid of this problem by 
registering another serializer (e.g. JavaSerializer) for problematic collection 
types in ExecutionConfig.

> java.lang.IndexOutOfBoundsException when creating a heap backend snapshot
> -
>
> Key: FLINK-10714
> URL: https://issues.apache.org/jira/browse/FLINK-10714
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.5.5, 1.6.2
> Environment: Flink 1.6.2, FsStateBackend
>Reporter: Michał Ciesielczyk
>Priority: Blocker
> Fix For: 1.7.0
>
>
> I'm sometimes getting error while creating a checkpointing using a filesystem 
> state backend. This ONLY happens when asynchronous snapshots are enabled 
> using the FileSystem State Backend. When RocksDB is enabled everything works 
> fine.
>  
> I'm using a simple KeyedStream,mapWithState function with a ValueState 
> holding a  hashmap (scala.collection.immutable.Map). It's hard to reproduce 
> the error using a simple code snippet, as the error occurs randomly.
>  
> This issue may be similar to FLINK-7484 and FLINK-8836 (both already fixed), 
> but I'm still experiencing such behavior.
>   
> Stacktrace:
>  
> {code:java}
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>     at java.util.ArrayList.rangeCheck(ArrayList.java:657) ~[?:1.8.0_172]
>     at java.util.ArrayList.set(ArrayList.java:448) ~[?:1.8.0_172]
>     at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
>  ~[kryo-shaded-4.0.0.jar:?]
>     at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) 
> ~[kryo-shaded-4.0.0.jar:?]
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) 
> ~[kryo-shaded-4.0.0.jar:?]
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>  ~[flink-core-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
>     at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) 
> [flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 

[jira] [Commented] (FLINK-6557) RocksDbStateBackendTest fails on Windows

2017-05-14 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009910#comment-16009910
 ] 

Xiaogang Shi commented on FLINK-6557:
-

[~Zentol] Thanks a lot for pointing it out. As far as i know, the maximum 
length of directory paths in Windows is 260. The length of the path printed on 
the log however is only 181. Could you provide more information (e.g., stacks 
on exception) to help address the problem?

> RocksDbStateBackendTest fails on Windows
> 
>
> Key: FLINK-6557
> URL: https://issues.apache.org/jira/browse/FLINK-6557
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>
> The {{RocksDbStateBackendTest}} fails on windows when incremental checkpoint 
> is enabled.
> Based on the exception i guess the file name is just simply too long:
> {code}
> org.rocksdb.RocksDBException: IO error: Failed to create dir: 
> /C:/Users/Zento/AppData/Local/Temp/junit572330160893758355/junit5754599533651878867/job-ecbdb9df76fd3a39108dac7c515e3214_op-Test_uuid-6a43f1f6-1f35-443e-945c-aab3643e62fc/chk-0.tmp:
>  Invalid argument
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6545) Make incremental checkpoints externalizable

2017-05-12 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16007785#comment-16007785
 ] 

Xiaogang Shi commented on FLINK-6545:
-

I prefer not to use `SavepointSerializer` to serialize/deserialize external 
checkpoints.  Since the implementation of incremental checkpointing may vary a 
lot in different state backends, it will be very tedious and error-prone for 
`SavepointSerializer` to support each kind of incremental state handle.

Given that checkpoints are not supposed to be back-compatible, maybe we can 
simply use java serializer to do the serialization of external checkpoints. 
What do you think?

> Make incremental checkpoints externalizable
> ---
>
> Key: FLINK-6545
> URL: https://issues.apache.org/jira/browse/FLINK-6545
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Incremental checkpoints are currently not externalizible. The missing piece 
> is familiarizing the {{SavepointSerializer}}(s) with the new state handles 
> classes that we added for incremental checkpointing. Currently, some of those 
> (e.g. 
> {{org.apache.flink.contrib.streaming.state.RocksDBIncrementalKeyedStateHandle}})
>  currently live in the contrib.rocksdb package and need to be migrated. We 
> can also push them to a different abstraction level, i.e. 
> {{IncrementalKeyedStateHandle}} with some private data, referenced existing 
> shared data (from previous checkpoints), and (presumably) newly created 
> shared data (first created by the current checkpoint).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6467) Potential lack of synchronization w.r.t. newSstFiles in RocksDBKeyedStateBackend#releaseResources()

2017-05-09 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002329#comment-16002329
 ] 

Xiaogang Shi commented on FLINK-6467:
-

[~te...@apache.org] Thanks for pointing out this problem. It's true that 
{{newSstFiles}} will be accessed by both the cancel thread and the 
materialization thread. But the materialization thread must be stopped (due to 
the closing of the input stream/output stream and the interruption) when 
{{releaseResources}} is executed in the cancel thread. So synchronization is 
not added here. What do you think?

> Potential lack of synchronization w.r.t. newSstFiles in 
> RocksDBKeyedStateBackend#releaseResources()
> ---
>
> Key: FLINK-6467
> URL: https://issues.apache.org/jira/browse/FLINK-6467
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   if (canceled) {
> List statesToDiscard = new ArrayList<>();
> statesToDiscard.add(metaStateHandle);
> statesToDiscard.addAll(miscFiles.values());
> statesToDiscard.addAll(newSstFiles.values());
> {code}
> It seems access to newSstFiles should be protected by 
> stateBackend.asyncSnapshotLock



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6504) Lack of synchronization on materializedSstFiles in RocksDBKEyedStateBackend

2017-05-09 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi reassigned FLINK-6504:
---

Assignee: Xiaogang Shi

> Lack of synchronization on materializedSstFiles in RocksDBKEyedStateBackend
> ---
>
> Key: FLINK-6504
> URL: https://issues.apache.org/jira/browse/FLINK-6504
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Xiaogang Shi
>
> Concurrent checkpoints could access `materializedSstFiles` in the 
> `RocksDBStateBackend` concurrently. This should be avoided.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend

2017-04-23 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-6364:
---

 Summary: Implement incremental checkpointing in RocksDBStateBackend
 Key: FLINK-6364
 URL: https://issues.apache.org/jira/browse/FLINK-6364
 Project: Flink
  Issue Type: Sub-task
Reporter: Xiaogang Shi
Assignee: Xiaogang Shi


{{RocksDBStateBackend}} is well suited for incremental checkpointing because 
RocksDB is base on LSM trees,  which record updates in new sst files and all 
sst files are immutable. By only materializing those new sst files, we can 
significantly improve the performance of checkpointing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6014) Allow the registration of state objects in checkpoints

2017-04-23 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi closed FLINK-6014.
---
Resolution: Fixed

Fixed via 218bed8b8e49b0e4c61c61f696a8f010eafea1b7 and 
aa21f853ab0380ec1f68ae1d0b7c8d9268da4533 

> Allow the registration of state objects in checkpoints
> --
>
> Key: FLINK-6014
> URL: https://issues.apache.org/jira/browse/FLINK-6014
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> This issue is the very first step towards incremental checkpointing. We 
> introduce a new state handle named {{CompositeStateHandle}} to be the base of 
> the snapshots taken by task components.  Known implementation may include 
> {{KeyedStateHandle}} (for {{KeyedStateBackend}}s), {{SubtaskState}} (for 
> subtasks, splits of {{JobVertex}}) and {{TaskState}} (for {{JobVertex}}s).
> Each {{CompositeStateHandle}} is composed of a collection of {{StateObject}s. 
> It should register all its state objects in {{StateRegistry}} when its 
> checkpoint is added into {{CompletedCheckpointStore}} (i.e., a pending 
> checkpoint completes or a complete checkpoint is reloaded in the recovery). 
> When a completed checkpoint is moved out of the {{CompletedCheckpointStore}}, 
> we should not simply discard all state objects in the checkpoint. With the 
> introduction of incremental checkpointing, a {{StateObject}} may be 
> referenced by different checkpoints. We should unregister all the state 
> objects contained in the {{StateRegistry}} first. Only those state objects 
> that are not referenced by any checkpoint can be deleted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6284) Incorrect sorting of completed checkpoints in ZooKeeperCompletedCheckpointStore

2017-04-09 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-6284:
---

 Summary: Incorrect sorting of completed checkpoints in 
ZooKeeperCompletedCheckpointStore
 Key: FLINK-6284
 URL: https://issues.apache.org/jira/browse/FLINK-6284
 Project: Flink
  Issue Type: Bug
Reporter: Xiaogang Shi


Now all completed checkpoints are sorted in their paths when they are recovered 
in {{ZooKeeperCompletedCheckpointStore}} . In the cases where the latest 
checkpoint's id is not the largest in lexical order (e.g., "100" is smaller 
than "99" in lexical order), Flink will not recover from the latest completed 
checkpoint.

The problem can be easily observed by setting the checkpoint ids in 
{{ZooKeeperCompletedCheckpointStoreITCase#testRecover()}} to be 99, 100 and 
101. 

To fix the problem, we should explicitly sort found checkpoints in their 
checkpoint ids, without the usage of 
{{ZooKeeperStateHandleStore#getAllSortedByName()}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration

2017-04-09 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962347#comment-15962347
 ] 

Xiaogang Shi commented on FLINK-3089:
-

[~aljoscha]  In current implementation, each RocksDB timer is identified by the 
timer's key, namespace and timestamp. Because RocksDB does not need to iterate 
over the timers to find the timer to delete, it's very efficient to delete a 
timer in RocksDB timer services.

> State API Should Support Data Expiration
> 
>
> Key: FLINK-3089
> URL: https://issues.apache.org/jira/browse/FLINK-3089
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Niels Basjes
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific OperatorState 
> value which I can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6219) Add a state backend which supports sorting

2017-03-31 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950388#comment-15950388
 ] 

Xiaogang Shi commented on FLINK-6219:
-

I prefer to use sorted states (e.g., {{SortedMapState}}) rather than a new 
state backend to address the described problem. Some users have mentioned 
similar demands for sorted states. Hence I think we should provide them to 
facilitate the development of user applications.

The implementation of such sorted states however may be very challenging. In 
{{HeapStateBackend}}, we need to implement a data structure which supports both 
Copy-on-Write (for asynchronous snapshotting) and sorting. In 
{{RocksDBStateBackend}} , we need to find an efficient way to support 
customized sorting. Though RocksDBJava allows customized comparators, the 
performance will be significantly degraded once a customized comparator is used 
(approximately 1/3 - 1/15 in QPS). 

It's critical to address the problems mentioned above. Otherwise, 
{{ValueState}} s whose data is typed {{SortedMap}} are better to sort user data 
under the same key.

> Add a state backend which supports sorting
> --
>
> Key: FLINK-6219
> URL: https://issues.apache.org/jira/browse/FLINK-6219
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Table API & SQL
>Reporter: sunjincheng
>
> When we implement the OVER window of 
> [FLIP11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]
> We notice that we need a state backend which supports sorting, allows for 
> efficient insertion, traversal in order, and removal from the head. 
> For example: In event-time OVER window, we need to sort by time,If the datas 
> as follow:
> {code}
> (1L, 1, Hello)
> (2L, 2, Hello)
> (5L, 5, Hello)
> (4L, 4, Hello)
> {code}
> We randomly insert the datas, just like:
> {code}
> put((2L, 2, Hello)),put((1L, 1, Hello)),put((5L, 5, Hello)),put((4L, 4, 
> Hello)),
> {code}
> We deal with elements in time order:
> {code}
> process((1L, 1, Hello)),process((2L, 2, Hello)),process((4L, 4, 
> Hello)),process((5L, 5, Hello))
> {code}
> Welcome anyone to give feedback,And what do you think? [~xiaogang.shi] 
> [~aljoscha] [~fhueske] 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6178) Allow upgrades to state serializers

2017-03-28 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944776#comment-15944776
 ] 

Xiaogang Shi commented on FLINK-6178:
-

[~tzulitai] Thanks a lot for your quick response. The changes to the interfaces 
in {{RuntimeContext}} sound great! They do help in the conversion of 
savepoints. Looking forwards to them.

> Allow upgrades to state serializers
> ---
>
> Key: FLINK-6178
> URL: https://issues.apache.org/jira/browse/FLINK-6178
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Currently, users are locked in with the serializer implementation used to 
> write their state.
> This is suboptimal, as generally for users, it could easily be possible that 
> they wish to change their serialization formats / state schemas and types in 
> the future.
> This is an umbrella JIRA for the required tasks to make this possible.
> Here's an overview description of what to expect for the overall outcome of 
> this JIRA (the specific details are outlined in their respective subtasks):
> Ideally, the main user-facing change this would result in is that users 
> implementing their custom {{TypeSerializer}} s will also need to implement 
> hook methods that identify whether or not there is a change to the serialized 
> format or even a change to the serialized data type. It would be the user's 
> responsibility that the {{deserialize}} method can bridge the change between 
> the old / new formats.
> For Flink's built-in serializers that are automatically built using the 
> user's configuration (most notably the more complex {{KryoSerializer}} and 
> {{GenericArraySerializer}}), Flink should be able to automatically 
> "reconfigure" them using the new configuration, so that the reconfigured 
> versions can be used to de- / serialize previous state. This would require 
> knowledge of the previous configuration of the serializer, therefore 
> "serializer configuration metadata" will be added to savepoints.
> Note that for the first version of this, although additional infrastructure 
> (e.g. serializer reconfigure hooks, serializer configuration metadata in 
> savepoints) will be added to potentially allow Kryo version upgrade, this 
> JIRA will not cover this. Kryo has breaking binary formats across major 
> versions, and will most likely need some further changes. Therefore, for the 
> {{KryoSerializer}}, "upgrading" it simply means changes in the registration 
> of specific / default serializers, at least for now.
> Finally, we would need to add a "convertState" phase to the task lifecycle, 
> that takes place after the "open" phase and before checkpointing starts / the 
> task starts running. It can only happen after "open", because only then can 
> we be certain if any reconfiguration of state serialization has occurred, and 
> state needs to be converted. Ideally, the code for the "convertState" is 
> designed so that it can be easily exposed as an offline tool in the future.
> For this JIRA, we should simply assume that after {{open()}}, we have all the 
> required information and serializers are appropriately reconfigured. 
> [~srichter] is currently planning to deprecate RuntimeContext state 
> registration methods in favor of a new interface that enforces eager state 
> registration, so that we may have all the info after {{open()}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6178) Allow upgrades to state serializers

2017-03-28 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944743#comment-15944743
 ] 

Xiaogang Shi commented on FLINK-6178:
-

The idea that allowing the upgrades to state serializers is excellent. But I 
have some concerns about the "convertState" phase. Currently, Flink has no 
knowledge of the serializers to use before users access the states (via the 
methods provided in {{RuntimeContext}}). That means, we can only convert the 
states when users are about to access them. The conversion may be very costly 
and the processing of data streams will be paused for quite a long time. 

Actually, i am very interested at the offline tool provided in the future. Now 
many efforts are made in Flink runtime to allow the restoring from old 
savepoints. They make the code very complicated and hard to follow. I prefer to 
move them from the main program to the offline tool.

I think the offline tool also eases the burdens of users to implement 
{{TypeSerializer}}s that allow the deserialization of the data in different 
serialization formats. They only need to provide the new serializers to access 
the states stored in the savepoints.

> Allow upgrades to state serializers
> ---
>
> Key: FLINK-6178
> URL: https://issues.apache.org/jira/browse/FLINK-6178
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Currently, users are locked in with the serializer implementation used to 
> write their state.
> This is suboptimal, as generally for users, it could easily be possible that 
> they wish to change their serialization formats / state schemas and types in 
> the future.
> This is an umbrella JIRA for the required tasks to make this possible.
> Here's an overview description of what to expect for the overall outcome of 
> this JIRA (the specific details are outlined in their respective subtasks):
> Ideally, the main user-facing change this would result in is that users 
> implementing their custom {{TypeSerializer}} s will also need to implement 
> hook methods that identify whether or not there is a change to the serialized 
> format or even a change to the serialized data type. It would be the user's 
> responsibility that the {{deserialize}} method can bridge the change between 
> the old / new formats.
> For Flink's built-in serializers that are automatically built using the 
> user's configuration (most notably the more complex {{KryoSerializer}} and 
> {{GenericArraySerializer}}), Flink should be able to automatically 
> "reconfigure" them using the new configuration, so that the reconfigured 
> versions can be used to de- / serialize previous state. This would require 
> knowledge of the previous configuration of the serializer, therefore 
> "serializer configuration metadata" will be added to savepoints.
> Note that for the first version of this, although additional infrastructure 
> (e.g. serializer reconfigure hooks, serializer configuration metadata in 
> savepoints) will be added to potentially allow Kryo version upgrade, this 
> JIRA will not cover this. Kryo has breaking binary formats across major 
> versions, and will most likely need some further changes. Therefore, for the 
> {{KryoSerializer}}, "upgrading" it simply means changes in the registration 
> of specific / default serializers, at least for now.
> Finally, we would need to add a "convertState" phase to the task lifecycle, 
> that takes place after the "open" phase and before checkpointing starts / the 
> task starts running. It can only happen after "open", because only then can 
> we be certain if any reconfiguration of state serialization has occurred, and 
> state needs to be converted. Ideally, the code for the "convertState" is 
> designed so that it can be easily exposed as an offline tool in the future.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6096) Refactor the migration of old versioned savepoints

2017-03-17 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi reassigned FLINK-6096:
---

Assignee: Xiaogang Shi

> Refactor the migration of old versioned savepoints
> --
>
> Key: FLINK-6096
> URL: https://issues.apache.org/jira/browse/FLINK-6096
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Existing code for the migration of old-versioned savepoints does not allow to 
> correctly deserialize those classes changed in different versions.  I think 
> we should create a migration package for each old-versioned savepoint and put 
> all migrated classes in the savepoint there. A mapping can be deployed to 
> record those migrated classes in the savepoint so that we can correctly 
> deserialize them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6096) Refactor the migration of old versioned savepoints

2017-03-17 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi updated FLINK-6096:

Component/s: State Backends, Checkpointing

> Refactor the migration of old versioned savepoints
> --
>
> Key: FLINK-6096
> URL: https://issues.apache.org/jira/browse/FLINK-6096
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> Existing code for the migration of old-versioned savepoints does not allow to 
> correctly deserialize those classes changed in different versions.  I think 
> we should create a migration package for each old-versioned savepoint and put 
> all migrated classes in the savepoint there. A mapping can be deployed to 
> record those migrated classes in the savepoint so that we can correctly 
> deserialize them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6096) Refactor the migration of old versioned savepoints

2017-03-17 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-6096:
---

 Summary: Refactor the migration of old versioned savepoints
 Key: FLINK-6096
 URL: https://issues.apache.org/jira/browse/FLINK-6096
 Project: Flink
  Issue Type: Improvement
Reporter: Xiaogang Shi


Existing code for the migration of old-versioned savepoints does not allow to 
correctly deserialize those classes changed in different versions.  I think we 
should create a migration package for each old-versioned savepoint and put all 
migrated classes in the savepoint there. A mapping can be deployed to record 
those migrated classes in the savepoint so that we can correctly deserialize 
them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6055) Supported setting timers on a Non-Keyed Stream

2017-03-14 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925575#comment-15925575
 ] 

Xiaogang Shi commented on FLINK-6055:
-

I think it's very challenging because the storing and restoring of timers in 
non-keyed streams is very difficult. Do you have any idea?

> Supported setting timers on a Non-Keyed Stream
> --
>
> Key: FLINK-6055
> URL: https://issues.apache.org/jira/browse/FLINK-6055
> Project: Flink
>  Issue Type: New Feature
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> After [FLINK-4460] Allow ProcessFunction on non-keyed streams, I want 
> supported setting timers on a Non-Keyed Stream. What do you think? 
> [~aljoscha] 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-13 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi updated FLINK-6034:

Description: 
Currently, the only type of the snapshots in keyed streams is 
{{KeyGroupsStateHandle}} which is full and store the states one group after 
another. With the introduction of incremental checkpointing, we need a higher 
level abstraction of keyed snapshots to allow flexible snapshot formats. 

The implementation of {{KeyedStateHandle}} s may vary a lot in different 
backends. The only information needed in {{KeyedStateHandle}} s is their key 
group range. When recovering the job with a different degree of parallelism, 
{{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
ranges overlap with their ranges.

  was:
Currently, the only type of the snapshots in keyed streams is 
{{KeyGroupsStateHandle}} which is full and store the states one group after 
another. With the introduction of incremental checkpointing, we need a higher 
level abstraction of keyed snapshots to allow flexible snapshot formats. 

The implementation of {{KeyedStateHandle}}s may vary a lot in different 
backends. The only information needed in {{KeyedStateHandle}}s is their key 
group range. When recovering the job with a different degree of parallelism, 
{{KeyedStateHandle}}s will be assigned to those subtasks whose key group ranges 
overlap with their ranges.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-13 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-6034:
---

 Summary: Add KeyedStateHandle for the snapshots in keyed streams
 Key: FLINK-6034
 URL: https://issues.apache.org/jira/browse/FLINK-6034
 Project: Flink
  Issue Type: Sub-task
Reporter: Xiaogang Shi
Assignee: Xiaogang Shi


Currently, the only type of the snapshots in keyed streams is 
{{KeyGroupsStateHandle}} which is full and store the states one group after 
another. With the introduction of incremental checkpointing, we need a higher 
level abstraction of keyed snapshots to allow flexible snapshot formats. 

The implementation of {{KeyedStateHandle}}s may vary a lot in different 
backends. The only information needed in {{KeyedStateHandle}}s is their key 
group range. When recovering the job with a different degree of parallelism, 
{{KeyedStateHandle}}s will be assigned to those subtasks whose key group ranges 
overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6027) Ignore the exception thrown by the subsuming of old completed checkpoints

2017-03-13 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-6027:
---

 Summary: Ignore the exception thrown by the subsuming of old 
completed checkpoints
 Key: FLINK-6027
 URL: https://issues.apache.org/jira/browse/FLINK-6027
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Xiaogang Shi
Assignee: Xiaogang Shi


When a checkpoint is added into the {{CompletedCheckpointStore}} via the method 
{{addCheckpoint()}}, the oldest checkpoints will be removed from the store if 
the number of stored checkpoints exceeds the given limit. The subsuming of old 
checkpoints may fail and make {{addCheckpoint()}} throw exceptions which are 
caught by {{CheckpointCoordinator}}. Finally, the states in the new checkpoint 
will be deleted by {{CheckpointCoordinator}}. Because the new checkpoint is 
still in the store, we may recover the job from the new checkpoint. But the 
recovery will fail as the states of the checkpoint are all deleted.

We should ignore the exceptions thrown by the subsuming of old checkpoints 
because we can always recover from the new checkpoint when successfully adding 
it into the store. The ignorance may produce some dirty data, but it's 
acceptable because they can be cleaned with the cleanup hook introduced in the 
near future.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6014) Allow the registration of state objects in checkpoints

2017-03-09 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-6014:
---

 Summary: Allow the registration of state objects in checkpoints
 Key: FLINK-6014
 URL: https://issues.apache.org/jira/browse/FLINK-6014
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Xiaogang Shi
Assignee: Xiaogang Shi


This issue is the very first step towards incremental checkpointing. We 
introduce a new state handle named {{CompositeStateHandle}} to be the base of 
the snapshots taken by task components.  Known implementation may include 
{{KeyedStateHandle}} (for {{KeyedStateBackend}}s), {{SubtaskState}} (for 
subtasks, splits of {{JobVertex}}) and {{TaskState}} (for {{JobVertex}}s).

Each {{CompositeStateHandle}} is composed of a collection of {{StateObject}s. 
It should register all its state objects in {{StateRegistry}} when its 
checkpoint is added into {{CompletedCheckpointStore}} (i.e., a pending 
checkpoint completes or a complete checkpoint is reloaded in the recovery). 

When a completed checkpoint is moved out of the {{CompletedCheckpointStore}}, 
we should not simply discard all state objects in the checkpoint. With the 
introduction of incremental checkpointing, a {{StateObject}} may be referenced 
by different checkpoints. We should unregister all the state objects contained 
in the {{StateRegistry}} first. Only those state objects that are not 
referenced by any checkpoint can be deleted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5053) Incremental / lightweight snapshots for checkpoints

2017-03-08 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi reassigned FLINK-5053:
---

Assignee: Xiaogang Shi

> Incremental / lightweight snapshots for checkpoints
> ---
>
> Key: FLINK-5053
> URL: https://issues.apache.org/jira/browse/FLINK-5053
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Xiaogang Shi
>
> There is currently basically no difference between savepoints and checkpoints 
> in Flink and both are created through exactly the same process.
> However, savepoints and checkpoints have a slightly different meaning which 
> we should take into account to keep Flink efficient:
> - Savepoints are (typically infrequently) triggered by the user to create a 
> state from which the application can be restarted, e.g. because Flink, some 
> code, or the parallelism needs to be changed.
> - Checkpoints are (typically frequently) triggered by the System to allow for 
> fast recovery in case of failure, but keeping the job/system unchanged.
> This means that savepoints and checkpoints can have different properties in 
> that:
> - Savepoint should represent a state of the application, where 
> characteristics of the job (e.g. parallelism) can be adjusted for the next 
> restart. One example for things that savepoints need to be aware of are 
> key-groups. Savepoints can potentially be a little more expensive than 
> checkpoints, because they are usually created a lot less frequently through 
> the user.
> - Checkpoints are frequently triggered by the system to allow for fast 
> failure recovery. However, failure recovery leaves all characteristics of the 
> job unchanged. This checkpoints do not have to be aware of those, e.g. think 
> again of key groups. Checkpoints should run faster than creating savepoints, 
> in particular it would be nice to have incremental checkpoints.
> For a first approach, I would suggest the following steps/changes:
> - In checkpoint coordination: differentiate between triggering checkpoints 
> and savepoints. Introduce properties for checkpoints that describe their set 
> of abilities, e.g. "is-key-group-aware", "is-incremental".
> - In state handle infrastructure: introduce state handles that reflect 
> incremental checkpoints and drop full key-group awareness, i.e. covering 
> folders instead of files and not having keygroup_id -> file/offset mapping, 
> but keygroup_range -> folder?
> - Backend side: We should start with RocksDB by reintroducing something 
> similar to semi-async snapshots, but using 
> BackupableDBOptions::setShareTableFiles(true) and transferring only new 
> incremental outputs to HDFS. Notice that using RocksDB's internal backup 
> mechanism is giving up on the information about individual key-groups. But as 
> explained above, this should be totally acceptable for checkpoints, while 
> savepoints should use the key-group-aware fully async mode. Of course we also 
> need to implement the ability to restore from both types of snapshots.
> One problem in the suggested approach is still that even checkpoints should 
> support scale-down, in case that only a smaller number of instances is left 
> available in a recovery case.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5917) Remove MapState.size()

2017-03-02 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi reassigned FLINK-5917:
---

Assignee: Xiaogang Shi

> Remove MapState.size()
> --
>
> Key: FLINK-5917
> URL: https://issues.apache.org/jira/browse/FLINK-5917
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>Assignee: Xiaogang Shi
>
> I'm proposing to remove {{size()}} because it is a prohibitively expensive 
> operation and users might not be aware of it. Instead of {{size()}} users can 
> use an iterator over all mappings to determine the size, when doing this they 
> will be aware of the fact that it is a costly operation.
> Right now, {{size()}} is only costly on the RocksDB state backend but I think 
> with future developments on the in-memory state backend it might also become 
> an expensive operation there.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5917) Remove MapState.size()

2017-03-02 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891970#comment-15891970
 ] 

Xiaogang Shi commented on FLINK-5917:
-

+1 to remove the {{size()}} method due to the cost implementation. 

> Remove MapState.size()
> --
>
> Key: FLINK-5917
> URL: https://issues.apache.org/jira/browse/FLINK-5917
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>
> I'm proposing to remove {{size()}} because it is a prohibitively expensive 
> operation and users might not be aware of it. Instead of {{size()}} users can 
> use an iterator over all mappings to determine the size, when doing this they 
> will be aware of the fact that it is a costly operation.
> Right now, {{size()}} is only costly on the RocksDB state backend but I think 
> with future developments on the in-memory state backend it might also become 
> an expensive operation there.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5925) Clean up extracted RocksDB JNI library

2017-02-28 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889462#comment-15889462
 ] 

Xiaogang Shi commented on FLINK-5925:
-

+1 for the cleanup mechanism. Each component (e.g. {{TaskManager}}, {{Task}} 
and {{StateBackend}}) should be given a working directory and it should place 
all its files in that directory. When the component exists, we can simply 
delete the directory to clean unused files.

> Clean up extracted RocksDB JNI library
> --
>
> Key: FLINK-5925
> URL: https://issues.apache.org/jira/browse/FLINK-5925
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>
> The {{RocksDBStateBackend}} extracts the RocksDB jni library from the RocksDB 
> dependency in a temp directory (see 
> {{RocksDBStateBackend#ensureRocksDBIsLoaded}}). This file is, however, never 
> removed. 
> In general, I think we should add a cleanup mechanism which cleans all 
> {{Task}} specific files after the {{Task}} has completed. The same applies to 
> the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5917) Remove MapState.size()

2017-02-27 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887094#comment-15887094
 ] 

Xiaogang Shi commented on FLINK-5917:
-

[~StephanEwen] We just use the cache to avoid the costly scanning. It's 
initialized the first time the `size()` method is called. After then, the cache 
will be updated every time a new entry is inserted or an entry is removed. When 
the backend is closed, we can simply drop the cache. 

A better choice, i think, is to use a RocksDB entry to record the value of the 
`size`. We don't need to write the value into the entry everytime it's updated. 
We can update it only when taking snapshots. But this requires states to be 
aware of checkpointing which is missing in our current implementation.

> Remove MapState.size()
> --
>
> Key: FLINK-5917
> URL: https://issues.apache.org/jira/browse/FLINK-5917
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>
> I'm proposing to remove {{size()}} because it is a prohibitively expensive 
> operation and users might not be aware of it. Instead of {{size()}} users can 
> use an iterator over all mappings to determine the size, when doing this they 
> will be aware of the fact that it is a costly operation.
> Right now, {{size()}} is only costly on the RocksDB state backend but I think 
> with future developments on the in-memory state backend it might also become 
> an expensive operation there.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5917) Remove MapState.size()

2017-02-26 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885019#comment-15885019
 ] 

Xiaogang Shi commented on FLINK-5917:
-

I think it's okay to remove this method. But a better choice is to use an 
in-memory cache to record the size of the state. That way, we can achieve 
constant-time implementation of the `size` method with little cost. I think 
this mechanism also works for the heap states in the future.

> Remove MapState.size()
> --
>
> Key: FLINK-5917
> URL: https://issues.apache.org/jira/browse/FLINK-5917
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>
> I'm proposing to remove {{size()}} because it is a prohibitively expensive 
> operation and users might not be aware of it. Instead of {{size()}} users can 
> use an iterator over all mappings to determine the size, when doing this they 
> will be aware of the fact that it is a costly operation.
> Right now, {{size()}} is only costly on the RocksDB state backend but I think 
> with future developments on the in-memory state backend it might also become 
> an expensive operation there.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5790) Use list types when ListStateDescriptor extends StateDescriptor

2017-02-23 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi closed FLINK-5790.
---
Resolution: Fixed

Fixed via d47446cafffe0d34d89488f6eb860aa139ceb3f1

> Use list types when ListStateDescriptor extends StateDescriptor
> ---
>
> Key: FLINK-5790
> URL: https://issues.apache.org/jira/browse/FLINK-5790
> Project: Flink
>  Issue Type: Improvement
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Flink keeps the state serializer in {{StateDescriptor}}, but it's the 
> serializer of list elements  that is put in {{ListStateDescriptor}}. The 
> implementation is a little confusing. Some backends need to construct the 
> state serializer with the element serializer by themselves.
> We should use an {{ArrayListSerializer}}, which is composed of the serializer 
> of the element, in the {{ListStateDescriptor}}. It helps the backend to avoid 
> constructing the state serializer.
> If a backend needs customized serialization of the state (e.g. 
> {{RocksDBStateBackend}}), it still can obtain the element serializer from the 
> {{ArrayListSerializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5863) Unify the serialization of queryable list states in different backends

2017-02-23 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi closed FLINK-5863.
---
Resolution: Fixed

Now that we are refactoring the queryable states, we can make the changes then.

> Unify the serialization of queryable list states in different backends
> --
>
> Key: FLINK-5863
> URL: https://issues.apache.org/jira/browse/FLINK-5863
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.3.0
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>Priority: Minor
>
> Now the deserialization of list states is implemented in 
> {{KvStateRequestSerializer}}. The serialization however is implemented 
> individually in different backends. 
> We should provide a method in {{KvStateRequestSerializer}} to remove the 
> redundant code.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5865) Throw original exception in states

2017-02-21 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi updated FLINK-5865:

Summary: Throw original exception in states  (was: Throw original exception 
in RocksDB states)

> Throw original exception in states
> --
>
> Key: FLINK-5865
> URL: https://issues.apache.org/jira/browse/FLINK-5865
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Now all exception thrown in RocksDB states are converted to 
> {{RuntimeException}}. It's unnecessary and will print useless stacks in the 
> log.
> I think it's better to throw the original exception, without any wrapping.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5865) Throw original exception in RocksDB states

2017-02-21 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5865:
---

 Summary: Throw original exception in RocksDB states
 Key: FLINK-5865
 URL: https://issues.apache.org/jira/browse/FLINK-5865
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.3.0
Reporter: Xiaogang Shi
Assignee: Xiaogang Shi


Now all exception thrown in RocksDB states are converted to 
{{RuntimeException}}. It's unnecessary and will print useless stacks in the log.

I think it's better to throw the original exception, without any wrapping.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5863) Unify the serialization of queryable list states in different backends

2017-02-21 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi reassigned FLINK-5863:
---

Assignee: Xiaogang Shi

> Unify the serialization of queryable list states in different backends
> --
>
> Key: FLINK-5863
> URL: https://issues.apache.org/jira/browse/FLINK-5863
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.3.0
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>Priority: Minor
>
> Now the deserialization of list states is implemented in 
> {{KvStateRequestSerializer}}. The serialization however is implemented 
> individually in different backends. 
> We should provide a method in {{KvStateRequestSerializer}} to remove the 
> redundant code.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5863) Unify the serialization of queryable list states in different backends

2017-02-21 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5863:
---

 Summary: Unify the serialization of queryable list states in 
different backends
 Key: FLINK-5863
 URL: https://issues.apache.org/jira/browse/FLINK-5863
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State
Affects Versions: 1.3.0
Reporter: Xiaogang Shi
Priority: Minor


Now the deserialization of list states is implemented in 
{{KvStateRequestSerializer}}. The serialization however is implemented 
individually in different backends. 

We should provide a method in {{KvStateRequestSerializer}} to remove the 
redundant code.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5036) Perform the grouping of keys in restoring instead of checkpointing

2017-02-15 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi closed FLINK-5036.
---
Resolution: Invalid

> Perform the grouping of keys in restoring instead of checkpointing
> --
>
> Key: FLINK-5036
> URL: https://issues.apache.org/jira/browse/FLINK-5036
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> Whenever taking snapshots of {{RocksDBKeyedStateBackend}}, the values in the 
> states will be written onto different files according to their key groups. 
> The procedure is very costly when the states are very big. 
> Given that the snapshot operations will be performed much more frequently 
> than restoring, we can leave the key groups as they are to improve the 
> overall performance. In other words, we can perform the grouping of keys in 
> restoring instead of in checkpointing.
> I think, the implementation will be very similar to the restoring of 
> non-partitioned states. Each task will receive a collection of snapshots each 
> of which contains a set of key groups. Each task will restore its states from 
> the given snapshots by picking values in assigned key groups.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5023) Add get() method in State interface

2017-02-13 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi closed FLINK-5023.
---
Resolution: Won't Fix

The updates to the `State` interface will affect existing user code. We will 
not update the interface before Flink 2.0.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts

2017-02-13 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi closed FLINK-5024.
---
Resolution: Invalid

Now we refactor the state descriptors with the introduction of composited 
serializers (e.g. {{ArrayListSerializer}})

> Add SimpleStateDescriptor to clarify the concepts
> -
>
> Key: FLINK-5024
> URL: https://issues.apache.org/jira/browse/FLINK-5024
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, StateDescriptors accept two type arguments : the first one is the 
> type of the created state and the second one is the type of the values in the 
> states. 
> The concepts however is a little confusing here because in ListStates, the 
> arguments passed to the StateDescriptors are the types of the list elements 
> instead of the lists. It also makes the implementation of MapStates difficult.
> I suggest not to put the type serializer in StateDescriptors, making 
> StateDescriptors independent of the data structures of the values. 
> A new type of StateDescriptor named SimpleStateDescriptor can be provided to 
> abstract those states (namely ValueState, ReducingState and FoldingState) 
> whose states are not composited. 
> The states (e.g. ListStates and MapStates) can implement their own 
> descriptors according to their data structures. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5790) Use list types when ListStateDescriptor extends StateDescriptor

2017-02-13 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5790:
---

 Summary: Use list types when ListStateDescriptor extends 
StateDescriptor
 Key: FLINK-5790
 URL: https://issues.apache.org/jira/browse/FLINK-5790
 Project: Flink
  Issue Type: Improvement
Reporter: Xiaogang Shi
Assignee: Xiaogang Shi


Flink keeps the state serializer in {{StateDescriptor}}, but it's the 
serializer of list elements  that is put in {{ListStateDescriptor}}. The 
implementation is a little confusing. Some backends need to construct the state 
serializer with the element serializer by themselves.

We should use an {{ArrayListSerializer}}, which is composed of the serializer 
of the element, in the {{ListStateDescriptor}}. It helps the backend to avoid 
constructing the state serializer.

If a backend needs customized serialization of the state (e.g. 
{{RocksDBStateBackend}}), it still can obtain the element serializer from the 
{{ArrayListSerializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5738) Destroy created backend when task is canceled

2017-02-07 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5738:
---

 Summary: Destroy created backend when task is canceled
 Key: FLINK-5738
 URL: https://issues.apache.org/jira/browse/FLINK-5738
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Xiaogang Shi


When a task is canceled, the {{ClosableRegistry}} will be closed in the cancel 
thread. However, the task may still in the creation of {{KeyedStateBackend}}, 
and it will fail to register the backend to the {{ClosableRegistry}}. Because 
the backend is not assigned to the operator yet (due to the exception), the 
backend will not be destroyed when the task thread exits.

A simple solution is to catch exception in the registering and destroy the 
created backend in the case of failures. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5590) Create a proper internal state hierarchy

2017-01-21 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833260#comment-15833260
 ] 

Xiaogang Shi commented on FLINK-5590:
-

[~StephanEwen], do you have any ideas of the solution?
I think {{KvState}} already provides some needed internal methods. Maybe we can 
extend it to create the internal state hierarchy?

> Create a proper internal state hierarchy
> 
>
> Key: FLINK-5590
> URL: https://issues.apache.org/jira/browse/FLINK-5590
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Currently, the state interfaces (like {{ListState}}, {{ValueState}}, 
> {{ReducingState}}) are very sparse and contain only methods exposed to the 
> users. That makes sense to keep the public stable API minimal
> At the same time, the runtime needs more methods for its internal interaction 
> with state, such as:
>   - setting namespaces
>   - accessing raw values
>   - merging namespaces
> These are currently realized by re-creating or re-obtaining the state objects 
> from the KeyedStateBackend. That method causes quite an overhead for each 
> access to the state
> The KeyedStateBackend tries to do some tricks to reduce that overhead, but 
> does it only partially and induces other overhead in the course.
> The root cause of all these issues is a problem in the design: There is no 
> proper "internal state abstraction" in a similar way as there is an external 
> state abstraction (the public state API).
> We should add a similar hierarchy of states for the internal methods. It 
> would look like in the example below:
> {code}
>  * State
>  *   |
>  *   +---InternalKvState
>  *   | |
>  *  MergingState   |
>  *   | |
>  *   +-InternalMergingState
>  *   | |
>  *  ++--+  |
>  *  |   |  |
>  * ReducingStateListState+-+-+
>  *  |   ||   |
>  *  +---+   +---   -InternalListState
>  *  ||
>  *  +-InternalReducingState
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-01-21 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi reassigned FLINK-5544:
---

Assignee: Xiaogang Shi

> Implement Internal Timer Service in RocksDB
> ---
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is 
> HeapInternalTimerService which stores all timers in memory. In the cases 
> where the number of keys is very large, the timer service will cost too much 
> memory. A implementation which stores timers in RocksDB seems good to deal 
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because 
> the timers are accessed in different ways. When timers are triggered, we need 
> to access timers in the order of timestamp. But when performing checkpoints, 
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of 
> merge sorting. We can store timers in RocksDB with the format 
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put 
> together and are sorted. 
> Then we can deploy an in-memory heap which keeps the first timer of each key 
> group to get the next timer to trigger. When a key group's first timer is 
> updated, we can efficiently update the heap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-01-17 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi updated FLINK-5544:

Issue Type: New Feature  (was: Bug)

> Implement Internal Timer Service in RocksDB
> ---
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Xiaogang Shi
>
> Now the only implementation of internal timer service is 
> HeapInternalTimerService which stores all timers in memory. In the cases 
> where the number of keys is very large, the timer service will cost too much 
> memory. A implementation which stores timers in RocksDB seems good to deal 
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because 
> the timers are accessed in different ways. When timers are triggered, we need 
> to access timers in the order of timestamp. But when performing checkpoints, 
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of 
> merge sorting. We can store timers in RocksDB with the format 
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put 
> together and are sorted. 
> Then we can deploy an in-memory heap which keeps the first timer of each key 
> group to get the next timer to trigger. When a key group's first timer is 
> updated, we can efficiently update the heap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-01-17 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5544:
---

 Summary: Implement Internal Timer Service in RocksDB
 Key: FLINK-5544
 URL: https://issues.apache.org/jira/browse/FLINK-5544
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Xiaogang Shi


Now the only implementation of internal timer service is 
HeapInternalTimerService which stores all timers in memory. In the cases where 
the number of keys is very large, the timer service will cost too much memory. 
A implementation which stores timers in RocksDB seems good to deal with these 
cases.

It might be a little challenging to implement a RocksDB timer service because 
the timers are accessed in different ways. When timers are triggered, we need 
to access timers in the order of timestamp. But when performing checkpoints, we 
must have a method to obtain all timers of a given key group.

A good implementation, as suggested by [~StephanEwen], follows the idea of 
merge sorting. We can store timers in RocksDB with the format 
{{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put 
together and are sorted. 

Then we can deploy an in-memory heap which keeps the first timer of each key 
group to get the next timer to trigger. When a key group's first timer is 
updated, we can efficiently update the heap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5398) Exclude generated files in module flink-batch-connectors in license checking

2016-12-29 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15786803#comment-15786803
 ] 

Xiaogang Shi commented on FLINK-5398:
-

[~fhueske] Thanks for your explanation. It helps.

> Exclude generated files in module flink-batch-connectors in license checking
> 
>
> Key: FLINK-5398
> URL: https://issues.apache.org/jira/browse/FLINK-5398
> Project: Flink
>  Issue Type: Bug
>Reporter: Xiaogang Shi
>
> Now the master branch fails to execute {{mvn install}} due to unlicensed 
> files in the module flink-batch-connectors. We should exclude these generated 
> files in the pom file.
> Unapproved licenses:
>   
> flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Address.java
>   
> flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
>   
> flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Fixed16.java
>   
> flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5400) Add accessor to folding states in RuntimeContext

2016-12-29 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi updated FLINK-5400:

Description: Now {{RuntimeContext}} does not provide the accessors to 
folding states. Therefore users cannot use folding states in their rich 
functions. I think we should provide the missing accessor.  (was: Now 
{{RuntimeContext}} does provide the accessors to folding states. Therefore 
users cannot use folding states in their rich functions. I think we should 
provide the missing accessor.)

> Add accessor to folding states in RuntimeContext
> 
>
> Key: FLINK-5400
> URL: https://issues.apache.org/jira/browse/FLINK-5400
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> Now {{RuntimeContext}} does not provide the accessors to folding states. 
> Therefore users cannot use folding states in their rich functions. I think we 
> should provide the missing accessor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5400) Add accessor to folding states in RuntimeContext

2016-12-29 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5400:
---

 Summary: Add accessor to folding states in RuntimeContext
 Key: FLINK-5400
 URL: https://issues.apache.org/jira/browse/FLINK-5400
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Xiaogang Shi


Now {{RuntimeContext}} does provide the accessors to folding states. Therefore 
users cannot use folding states in their rich functions. I think we should 
provide the missing accessor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5398) Exclude generated files in module flink-batch-connectors in license checking

2016-12-28 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5398:
---

 Summary: Exclude generated files in module flink-batch-connectors 
in license checking
 Key: FLINK-5398
 URL: https://issues.apache.org/jira/browse/FLINK-5398
 Project: Flink
  Issue Type: Bug
Reporter: Xiaogang Shi


Now the master branch fails to execute {{mvn install}} due to unlicensed files 
in the module flink-batch-connectors. We should exclude these generated files 
in the pom file.

Unapproved licenses:

  
flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Address.java
  
flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
  
flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Fixed16.java
  
flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5397) Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors

2016-12-28 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15782811#comment-15782811
 ] 

Xiaogang Shi commented on FLINK-5397:
-

The idea does work and is better. Very thanks for the quick fix :)

> Fail to deserialize savepoints in v1.1 when there exist missing fields in 
> class serialization descriptors
> -
>
> Key: FLINK-5397
> URL: https://issues.apache.org/jira/browse/FLINK-5397
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Stefan Richter
>
> To restore from the savepoints in previous versions, Flink now keeps all 
> classes whose serialization is changed and put them in a separated package 
> ("migration"). 
> When deserializing the old savepoints, flink will look up correct descriptors 
> ({{ObjectStreamClass}}) for these classes, without using those ones written 
> in serialized data. The implementation however is problematic when there 
> exist missing field descriptors in the serialized data. 
> When serializing an object, Java will only write the descriptors of those 
> non-null fields. But when we look up class descriptors with given classes, 
> all fields will be put into the descriptors. As a result, we will deserialize 
> the savepoints with incorrect descriptors, leading to serialization 
> exceptions.
> A simple resolution is to update the name of read descriptors using 
> Reflections, without using different descriptors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5397) Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors

2016-12-28 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi updated FLINK-5397:

Description: 
To restore from the savepoints in previous versions, Flink now keeps all 
classes whose serialization is changed and put them in a separated package 
("migration"). 

When deserializing the old savepoints, flink will look up correct descriptors 
({{ObjectStreamClass}}) for these classes, without using those ones written in 
serialized data. The implementation however is problematic when there exist 
missing field descriptors in the serialized data. 

When serializing an object, Java will only write the descriptors of those 
non-null fields. But when we look up class descriptors with given classes, all 
fields will be put into the descriptors. As a result, we will deserialize the 
savepoints with incorrect descriptors, leading to serialization exceptions.

A simple resolution is to update the name of read descriptors using 
Reflections, without using different descriptors.

  was:
To restore from the savepoints in previous versions, Flink now keeps all 
classes whose serialization is changed and put them in a separated package 
("migration"). 

When deserializing the old savepoints, flink will look up correct descriptors 
({{ObjectStreamClass}}) for these classes, without using those ones written in 
serialized data. The implementation however is problematic when there exist 
missing field descriptors in the serialized data. 

When deserializing an object, Java will only write the descriptors of those 
non-null fields. But when we look up class descriptors with given classes, all 
fields will be put into the descriptors. As a result, we will deserialize the 
savepoints with incorrect descriptors, leading to serialization exceptions.

A simple resolution is to update the name of read descriptors using 
Reflections, without using a different descriptors.


> Fail to deserialize savepoints in v1.1 when there exist missing fields in 
> class serialization descriptors
> -
>
> Key: FLINK-5397
> URL: https://issues.apache.org/jira/browse/FLINK-5397
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> To restore from the savepoints in previous versions, Flink now keeps all 
> classes whose serialization is changed and put them in a separated package 
> ("migration"). 
> When deserializing the old savepoints, flink will look up correct descriptors 
> ({{ObjectStreamClass}}) for these classes, without using those ones written 
> in serialized data. The implementation however is problematic when there 
> exist missing field descriptors in the serialized data. 
> When serializing an object, Java will only write the descriptors of those 
> non-null fields. But when we look up class descriptors with given classes, 
> all fields will be put into the descriptors. As a result, we will deserialize 
> the savepoints with incorrect descriptors, leading to serialization 
> exceptions.
> A simple resolution is to update the name of read descriptors using 
> Reflections, without using different descriptors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5397) Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors

2016-12-28 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5397:
---

 Summary: Fail to deserialize savepoints in v1.1 when there exist 
missing fields in class serialization descriptors
 Key: FLINK-5397
 URL: https://issues.apache.org/jira/browse/FLINK-5397
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Xiaogang Shi


To restore from the savepoints in previous versions, Flink now keeps all 
classes whose serialization is changed and put them in a separated package 
("migration"). 

When deserializing the old savepoints, flink will look up correct descriptors 
({{ObjectStreamClass}}) for these classes, without using those ones written in 
serialized data. The implementation however is problematic when there exist 
missing field descriptors in the serialized data. 

When deserializing an object, Java will only write the descriptors of those 
non-null fields. But when we look up class descriptors with given classes, all 
fields will be put into the descriptors. As a result, we will deserialize the 
savepoints with incorrect descriptors, leading to serialization exceptions.

A simple resolution is to update the name of read descriptors using 
Reflections, without using a different descriptors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5214) Clean up checkpoint files when failing checkpoint operation on TM

2016-11-30 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15710512#comment-15710512
 ] 

Xiaogang Shi commented on FLINK-5214:
-

I opened FLINK-5086 to report a similar problem, but I do not have a good idea 
how to resolve it. 

Because JM does know the existence of these checkpoint files, it seems only TM 
can delete them. But as a failed TM may not be recovered by the JM if the 
number of retries exceeds the given limit,  these files will not be deleted in 
such cases.

One possible solution i think is to let each TM return a handler to JM when the 
TM is registered. JM can use the handler to clean the files even when the TM 
fails. 

Another solution is to recover the TM when the number of retries exceeds the 
limit. Once the TM is recovered, the only thing it does is to clean the 
checkpoint files.

Do you have any better ideas?

> Clean up checkpoint files when failing checkpoint operation on TM
> -
>
> Key: FLINK-5214
> URL: https://issues.apache.org/jira/browse/FLINK-5214
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> When the {{StreamTask#performCheckpoint}} operation fails on a 
> {{TaskManager}} potentially created checkpoint files are not cleaned up. This 
> should be changed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5090) Expose optionally detailed metrics about network queue lengths

2016-11-19 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15679596#comment-15679596
 ] 

Xiaogang Shi commented on FLINK-5090:
-

In flink, the performance bottlenecks are usually caused by
1. the mismatched parallelism of the producer and the consumer operators.
2. the imbalanced load across the different tasks of the same operator

The metrics of all channels help a lot to figure out the two problems.
 But the solution to the second problem usually needs modification to the 
application logic.

The gate-wise metrics are sufficient to identify the first problem.
I think it requires few additional overheads (due to two input operators).






> Expose optionally detailed metrics about network queue lengths
> --
>
> Key: FLINK-5090
> URL: https://issues.apache.org/jira/browse/FLINK-5090
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0, 1.1.4
>
>
> For debugging purposes, it is important to have access to more detailed 
> metrics about the length of network input and output queues.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5090) Expose optionally detailed metrics about network queue lengths

2016-11-17 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15675401#comment-15675401
 ] 

Xiaogang Shi commented on FLINK-5090:
-

I suggest the metrics to be channel-wise. With these metrics, we will easily 
find those hotspot channels.

What do you think?

> Expose optionally detailed metrics about network queue lengths
> --
>
> Key: FLINK-5090
> URL: https://issues.apache.org/jira/browse/FLINK-5090
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0, 1.1.4
>
>
> For debugging purposes, it is important to have access to more detailed 
> metrics about the length of network input and output queues.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5086) Clean dead snapshot files produced by the tasks failing to acknowledge checkpoints

2016-11-17 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5086:
---

 Summary: Clean dead snapshot files produced by the tasks failing 
to acknowledge checkpoints
 Key: FLINK-5086
 URL: https://issues.apache.org/jira/browse/FLINK-5086
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Xiaogang Shi


A task may fail when performing checkpoints. In that case, the task may have 
already copied some data to external storage. But since the task fails to send 
the state handler to {{CheckpointCoordinator}}, the copied data will not be 
deleted by {{CheckpointCoordinator}}. 

I think we must find a method to clean such dead snapshot data to avoid 
unlimited usage of external storage. 

One possible method is to clean these dead files when the task recovers. When a 
task recovers, {{CheckpointCoordinator}} will tell the task all the retained 
checkpoints. The task then can scan the external storage to delete all the  
snapshots not in these retained checkpoints.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5053) Incremental / lightweight snapshots for checkpoints

2016-11-17 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15673253#comment-15673253
 ] 

Xiaogang Shi edited comment on FLINK-5053 at 11/17/16 9:41 AM:
---

[~srichter] Do you have a more detailed plan about incremental checkpoints? 

I think much more work is needed to make it. One big problem is the concurrent 
modification made by TaskExecutors and JobMaster.

Currently, the state handlers as well as the snapshot data (the files on HDFS) 
are both deleted by the JobMasters. In incremental checkpoints, a file may be 
used in different checkpoints. The concurrent access to the files may lead to 
incorrect results. For example, JobMaster may delete a file which the 
TaskExecutor thought it's already on HDFS and did not copy the file onto HDFS.

One method is to synchronize the access of JobMasters and TaskExecutors. 
Another solution, i think, is to let TaskExecutors delete these snapshot files. 
That way, all access to the snapshot data is made by TaskExecutors, hence 
avoiding the need of synchronization.

Do you have any idea about this problem?


was (Author: xiaogang.shi):
[~srichter] Do you have a more detailed plan about incremental checkpoints? 

I think much more work is needed to make it. One big problem is the concurrent 
modification made by TaskExecutors and JobMaster.

Currently, the state handlers as well as the snapshot data (the files on HDFS) 
are both deleted by the JobMasters. In incremental checkpoints, a file may be 
used in different checkpoints. 

One method is to synchronize the access of JobMasters and TaskExecutors. 
Another solution, i think, is to let TaskExecutors delete these snapshot files.

Do you have any idea about this problem?

> Incremental / lightweight snapshots for checkpoints
> ---
>
> Key: FLINK-5053
> URL: https://issues.apache.org/jira/browse/FLINK-5053
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>
> There is currently basically no difference between savepoints and checkpoints 
> in Flink and both are created through exactly the same process.
> However, savepoints and checkpoints have a slightly different meaning which 
> we should take into account to keep Flink efficient:
> - Savepoints are (typically infrequently) triggered by the user to create a 
> state from which the application can be restarted, e.g. because Flink, some 
> code, or the parallelism needs to be changed.
> - Checkpoints are (typically frequently) triggered by the System to allow for 
> fast recovery in case of failure, but keeping the job/system unchanged.
> This means that savepoints and checkpoints can have different properties in 
> that:
> - Savepoint should represent a state of the application, where 
> characteristics of the job (e.g. parallelism) can be adjusted for the next 
> restart. One example for things that savepoints need to be aware of are 
> key-groups. Savepoints can potentially be a little more expensive than 
> checkpoints, because they are usually created a lot less frequently through 
> the user.
> - Checkpoints are frequently triggered by the system to allow for fast 
> failure recovery. However, failure recovery leaves all characteristics of the 
> job unchanged. This checkpoints do not have to be aware of those, e.g. think 
> again of key groups. Checkpoints should run faster than creating savepoints, 
> in particular it would be nice to have incremental checkpoints.
> For a first approach, I would suggest the following steps/changes:
> - In checkpoint coordination: differentiate between triggering checkpoints 
> and savepoints. Introduce properties for checkpoints that describe their set 
> of abilities, e.g. "is-key-group-aware", "is-incremental".
> - In state handle infrastructure: introduce state handles that reflect 
> incremental checkpoints and drop full key-group awareness, i.e. covering 
> folders instead of files and not having keygroup_id -> file/offset mapping, 
> but keygroup_range -> folder?
> - Backend side: We should start with RocksDB by reintroducing something 
> similar to semi-async snapshots, but using 
> BackupableDBOptions::setShareTableFiles(true) and transferring only new 
> incremental outputs to HDFS. Notice that using RocksDB's internal backup 
> mechanism is giving up on the information about individual key-groups. But as 
> explained above, this should be totally acceptable for checkpoints, while 
> savepoints should use the key-group-aware fully async mode. Of course we also 
> need to implement the ability to restore from both types of snapshots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5053) Incremental / lightweight snapshots for checkpoints

2016-11-17 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15673253#comment-15673253
 ] 

Xiaogang Shi commented on FLINK-5053:
-

[~srichter] Do you have a more detailed plan about incremental checkpoints? 

I think much more work is needed to make it. One big problem is the concurrent 
modification made by TaskExecutors and JobMaster.

Currently, the state handlers as well as the snapshot data (the files on HDFS) 
are both deleted by the JobMasters. In incremental checkpoints, a file may be 
used in different checkpoints. 

One method is to synchronize the access of JobMasters and TaskExecutors. 
Another solution, i think, is to let TaskExecutors delete these snapshot files.

Do you have any idea about this problem?

> Incremental / lightweight snapshots for checkpoints
> ---
>
> Key: FLINK-5053
> URL: https://issues.apache.org/jira/browse/FLINK-5053
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>
> There is currently basically no difference between savepoints and checkpoints 
> in Flink and both are created through exactly the same process.
> However, savepoints and checkpoints have a slightly different meaning which 
> we should take into account to keep Flink efficient:
> - Savepoints are (typically infrequently) triggered by the user to create a 
> state from which the application can be restarted, e.g. because Flink, some 
> code, or the parallelism needs to be changed.
> - Checkpoints are (typically frequently) triggered by the System to allow for 
> fast recovery in case of failure, but keeping the job/system unchanged.
> This means that savepoints and checkpoints can have different properties in 
> that:
> - Savepoint should represent a state of the application, where 
> characteristics of the job (e.g. parallelism) can be adjusted for the next 
> restart. One example for things that savepoints need to be aware of are 
> key-groups. Savepoints can potentially be a little more expensive than 
> checkpoints, because they are usually created a lot less frequently through 
> the user.
> - Checkpoints are frequently triggered by the system to allow for fast 
> failure recovery. However, failure recovery leaves all characteristics of the 
> job unchanged. This checkpoints do not have to be aware of those, e.g. think 
> again of key groups. Checkpoints should run faster than creating savepoints, 
> in particular it would be nice to have incremental checkpoints.
> For a first approach, I would suggest the following steps/changes:
> - In checkpoint coordination: differentiate between triggering checkpoints 
> and savepoints. Introduce properties for checkpoints that describe their set 
> of abilities, e.g. "is-key-group-aware", "is-incremental".
> - In state handle infrastructure: introduce state handles that reflect 
> incremental checkpoints and drop full key-group awareness, i.e. covering 
> folders instead of files and not having keygroup_id -> file/offset mapping, 
> but keygroup_range -> folder?
> - Backend side: We should start with RocksDB by reintroducing something 
> similar to semi-async snapshots, but using 
> BackupableDBOptions::setShareTableFiles(true) and transferring only new 
> incremental outputs to HDFS. Notice that using RocksDB's internal backup 
> mechanism is giving up on the information about individual key-groups. But as 
> explained above, this should be totally acceptable for checkpoints, while 
> savepoints should use the key-group-aware fully async mode. Of course we also 
> need to implement the ability to restore from both types of snapshots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5085) Execute CheckpointCoodinator's state discard calls asynchronously

2016-11-16 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15673026#comment-15673026
 ] 

Xiaogang Shi commented on FLINK-5085:
-

Great, this is what i thought of in recent days. Our states are composed of 
thousands of files on HDFS. It takes a long time to delete them in sequence. A 
dedicated executor will help improve the performance.



> Execute CheckpointCoodinator's state discard calls asynchronously
> -
>
> Key: FLINK-5085
> URL: https://issues.apache.org/jira/browse/FLINK-5085
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> The {{CheckpointCoordinator}} discards under certain circumstances pending 
> checkpoints or state handles. These discard operations can involve a blocking 
> IO operation if the underlying state handle refers to a file which has to be 
> deleted. In order to not block the calling thread, we should execute these 
> calls in a dedicated IO executor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5053) Incremental / lightweight snapshots for checkpoints

2016-11-14 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15663160#comment-15663160
 ] 

Xiaogang Shi commented on FLINK-5053:
-

I think it's better to use {{checkpoint}} instead of {{backup}} to perform 
incremental checkpointing of rocksdb. The {{checkpoint}} method will create 
hard links for all living files, without the need to copy files. Hence it can 
help reduce the time taken in the synchronous part.

What do you think?

> Incremental / lightweight snapshots for checkpoints
> ---
>
> Key: FLINK-5053
> URL: https://issues.apache.org/jira/browse/FLINK-5053
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>
> There is currently basically no difference between savepoints and checkpoints 
> in Flink and both are created through exactly the same process.
> However, savepoints and checkpoints have a slightly different meaning which 
> we should take into account to keep Flink efficient:
> - Savepoints are (typically infrequently) triggered by the user to create a 
> state from which the application can be restarted, e.g. because Flink, some 
> code, or the parallelism needs to be changed.
> - Checkpoints are (typically frequently) triggered by the System to allow for 
> fast recovery in case of failure, but keeping the job/system unchanged.
> This means that savepoints and checkpoints can have different properties in 
> that:
> - Savepoint should represent a state of the application, where 
> characteristics of the job (e.g. parallelism) can be adjusted for the next 
> restart. One example for things that savepoints need to be aware of are 
> key-groups. Savepoints can potentially be a little more expensive than 
> checkpoints, because they are usually created a lot less frequently through 
> the user.
> - Checkpoints are frequently triggered by the system to allow for fast 
> failure recovery. However, failure recovery leaves all characteristics of the 
> job unchanged. This checkpoints do not have to be aware of those, e.g. think 
> again of key groups. Checkpoints should run faster than creating savepoints, 
> in particular it would be nice to have incremental checkpoints.
> For a first approach, I would suggest the following steps/changes:
> - In checkpoint coordination: differentiate between triggering checkpoints 
> and savepoints. Introduce properties for checkpoints that describe their set 
> of abilities, e.g. "is-key-group-aware", "is-incremental".
> - In state handle infrastructure: introduce state handles that reflect 
> incremental checkpoints and drop full key-group awareness, i.e. covering 
> folders instead of files and not having keygroup_id -> file/offset mapping, 
> but keygroup_range -> folder?
> - Backend side: We should start with RocksDB by reintroducing something 
> similar to semi-async snapshots, but using 
> BackupableDBOptions::setShareTableFiles(true) and transferring only new 
> incremental outputs to HDFS. Notice that using RocksDB's internal backup 
> mechanism is giving up on the information about individual key-groups. But as 
> explained above, this should be totally acceptable for checkpoints, while 
> savepoints should use the key-group-aware fully async mode. Of course we also 
> need to implement the ability to restore from both types of snapshots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-09 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15653177#comment-15653177
 ] 

Xiaogang Shi commented on FLINK-5023:
-

[~aljoscha] [~StephanEwen] I have updated the PR. Now, `State` only provides a 
read-only accessor and a new interface called `UpdatableState` is added.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts

2016-11-09 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652901#comment-15652901
 ] 

Xiaogang Shi commented on FLINK-5024:
-

I am very poor at English :( But i think "Simple" is more often used as the 
opposite of "Compound". For example: simple interests and compound interests.  
"Primitive" is not that good because it is usually used to describe those BASIC 
elements which form the other things.

Maybe we need some help from native speakers lol



> Add SimpleStateDescriptor to clarify the concepts
> -
>
> Key: FLINK-5024
> URL: https://issues.apache.org/jira/browse/FLINK-5024
> Project: Flink
>  Issue Type: Improvement
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, StateDescriptors accept two type arguments : the first one is the 
> type of the created state and the second one is the type of the values in the 
> states. 
> The concepts however is a little confusing here because in ListStates, the 
> arguments passed to the StateDescriptors are the types of the list elements 
> instead of the lists. It also makes the implementation of MapStates difficult.
> I suggest not to put the type serializer in StateDescriptors, making 
> StateDescriptors independent of the data structures of the values. 
> A new type of StateDescriptor named SimpleStateDescriptor can be provided to 
> abstract those states (namely ValueState, ReducingState and FoldingState) 
> whose states are not composited. 
> The states (e.g. ListStates and MapStates) can implement their own 
> descriptors according to their data structures. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5036) Perform the grouping of keys in restoring instead of checkpointing

2016-11-09 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650462#comment-15650462
 ] 

Xiaogang Shi commented on FLINK-5036:
-

Current implementation of both checkpointing and restoring requires to iterate 
over all key-value pairs. When the states are very big (up to multiple GBs or 
TBs, which are usual cases in our daily jobs), the performance is obviously 
unacceptable.

Let me explain more details about my proposal. By not organizing kv pairs into 
grouping, we can avoid the iterating and can directly copy the files of RocksDB 
onto HDFS. Of course, we should record all the key groups contained in the 
produced snapshot.

When restoring from snapshots, the master will give each task all the rocksdb 
that contain assigned key groups. Tasks can pick those keys assigned to them by 
accessing these rocksdbs. If the key groups in a rocksdb are all assigned to 
the task, then the task can avoid unnecessary picking. 

In most cases where the degree of parallelism is not changed, fast recovery can 
be achieved because states can be restored by simply copying the files from 
HDFS. In all cases, the performance will be much better than existing 
implementation which needs costly iterating.



> Perform the grouping of keys in restoring instead of checkpointing
> --
>
> Key: FLINK-5036
> URL: https://issues.apache.org/jira/browse/FLINK-5036
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> Whenever taking snapshots of {{RocksDBKeyedStateBackend}}, the values in the 
> states will be written onto different files according to their key groups. 
> The procedure is very costly when the states are very big. 
> Given that the snapshot operations will be performed much more frequently 
> than restoring, we can leave the key groups as they are to improve the 
> overall performance. In other words, we can perform the grouping of keys in 
> restoring instead of in checkpointing.
> I think, the implementation will be very similar to the restoring of 
> non-partitioned states. Each task will receive a collection of snapshots each 
> of which contains a set of key groups. Each task will restore its states from 
> the given snapshots by picking values in assigned key groups.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5036) Perform the grouping of keys in restoring instead of checkpointing

2016-11-08 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5036:
---

 Summary: Perform the grouping of keys in restoring instead of 
checkpointing
 Key: FLINK-5036
 URL: https://issues.apache.org/jira/browse/FLINK-5036
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Xiaogang Shi


Whenever taking snapshots of {{RocksDBKeyedStateBackend}}, the values in the 
states will be written onto different files according to their key groups. The 
procedure is very costly when the states are very big. 

Given that the snapshot operations will be performed much more frequently than 
restoring, we can leave the key groups as they are to improve the overall 
performance. In other words, we can perform the grouping of keys in restoring 
instead of in checkpointing.

I think, the implementation will be very similar to the restoring of 
non-partitioned states. Each task will receive a collection of snapshots each 
of which contains a set of key groups. Each task will restore its states from 
the given snapshots by picking values in assigned key groups.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5023) Add get() method in State interface

2016-11-08 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649615#comment-15649615
 ] 

Xiaogang Shi edited comment on FLINK-5023 at 11/9/16 2:52 AM:
--

[~aljoscha] In that case, i think we should provide {{UpdatableState}} instead 
of {{ReadableState}}.  

Now {{State}} is updatable because it has the method {{clear()}}. I would 
prefer {{ReadableState}} inherits from {{State}}; otherwise, it's very weird 
that a readable state is not a state. 

>From this point, i think the base {{State}} should be readable, and a new 
>interface {{UpdatableState}} could be provided to abstract those states 
>allowing both reads and writes.


was (Author: xiaogang.shi):
@Aljoscha Krettek  In that case, i think we should provide {{UpdatableState}} 
instead of {{ReadableState}}.  

Now {{State}} is updatable because it has the method {{clear()}}. I would 
prefer {{ReadableState}} inherits from {{State}}; otherwise, it's very weird 
that a readable state is not a state. 

>From this point, i think the base {{State}} should be readable, and a new 
>interface {{UpdatableState}} could be provided to abstract those states 
>allowing both reads and writes.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5023) Add get() method in State interface

2016-11-08 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649615#comment-15649615
 ] 

Xiaogang Shi edited comment on FLINK-5023 at 11/9/16 2:46 AM:
--

@Aljoscha Krettek  In that case, i think we should provide {{UpdatableState}} 
instead of {{ReadableState}}.  

Now {{State}} is updatable because it has the method {{clear()}}. I would 
prefer {{ReadableState}} inherits from {{State}}; otherwise, it's very weird 
that a readable state is not a state. 

>From this point, i think the base {{State}} should be readable, and a new 
>interface {{UpdatableState}} could be provided to abstract those states 
>allowing both reads and writes.


was (Author: xiaogang.shi):
@Aljoscha Krettek  In that case, i think we should provide {{UpdatableState}} 
instead of {{ReadableState}}.  

Now {{State}} is updatable because it has the method {{clear()}}. If 
{{ReadableState}} does not inherit from {{State}}. I would prefer 
{{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a 
readable state is not a state. 

>From this point, i think the base {{State}} should be readable, and a new 
>interface {{UpdatableState}} could be provided to abstract those states 
>allowing both reads and writes.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5023) Add get() method in State interface

2016-11-08 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649615#comment-15649615
 ] 

Xiaogang Shi edited comment on FLINK-5023 at 11/9/16 2:45 AM:
--

@Aljoscha Krettek  In that case, i think we should provide {{UpdatableState}} 
instead of {{ReadableState}}.  

Now {{State}} is updatable because it has the method {{clear()}}. If 
{{ReadableState}} does not inherit from {{State}}. I would prefer 
{{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a 
readable state is not a state. 

>From this point, i think the base {{State}} should be readable, and a new 
>interface {{UpdatableState}} could be provided to abstract those states 
>allowing both reads and writes.


was (Author: xiaogang.shi):
Aljoscha Krettek  In that case, i think we should provide {{UpdatableState}} 
instead of {{ReadableState}}.  

Now {{State}} is updatable because it has the method {{clear()}}. If 
{{ReadableState}} does not inherit from {{State}}. I would prefer 
{{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a 
readable state is not a state. 

>From this point, i think the base {{State}} should be readable, and a new 
>interface {{UpdatableState}} could be provided to abstract those states 
>allowing both reads and writes.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-08 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649615#comment-15649615
 ] 

Xiaogang Shi commented on FLINK-5023:
-

Aljoscha Krettek  In that case, i think we should provide {{UpdatableState}} 
instead of {{ReadableState}}.  

Now {{State}} is updatable because it has the method {{clear()}}. If 
{{ReadableState}} does not inherit from {{State}}. I would prefer 
{{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a 
readable state is not a state. 

>From this point, i think the base {{State}} should be readable, and a new 
>interface {{UpdatableState}} could be provided to abstract those states 
>allowing both reads and writes.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts

2016-11-07 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi reassigned FLINK-5024:
---

Assignee: Xiaogang Shi

> Add SimpleStateDescriptor to clarify the concepts
> -
>
> Key: FLINK-5024
> URL: https://issues.apache.org/jira/browse/FLINK-5024
> Project: Flink
>  Issue Type: Improvement
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, StateDescriptors accept two type arguments : the first one is the 
> type of the created state and the second one is the type of the values in the 
> states. 
> The concepts however is a little confusing here because in ListStates, the 
> arguments passed to the StateDescriptors are the types of the list elements 
> instead of the lists. It also makes the implementation of MapStates difficult.
> I suggest not to put the type serializer in StateDescriptors, making 
> StateDescriptors independent of the data structures of the values. 
> A new type of StateDescriptor named SimpleStateDescriptor can be provided to 
> abstract those states (namely ValueState, ReducingState and FoldingState) 
> whose states are not composited. 
> The states (e.g. ListStates and MapStates) can implement their own 
> descriptors according to their data structures. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5023) Add get() method in State interface

2016-11-07 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi reassigned FLINK-5023:
---

Assignee: Xiaogang Shi

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-07 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15646299#comment-15646299
 ] 

Xiaogang Shi commented on FLINK-5023:
-

I have opened a PR: https://github.com/apache/flink/pull/2768/files. 

Since the implementation of `State` and `StateDescriptor` is closely connected, 
i also put the code of FLINK-5024 in this PR. 

Existing code may be affected by the changes in `StateDescriptor` because now 
`StateDescriptor` only accept one type argument.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-07 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643925#comment-15643925
 ] 

Xiaogang Shi commented on FLINK-5023:
-

The only old method affected is the `value()` method in ValueState. All other 
states have already implemented the `get()` method. We can implement 
`ValueState#value()` by wrapping the `get()` method to avoid any changes to 
existing code. 

The introduction of ReadableState works. But I think the additional 
interfaces will make the code "verbose" :)

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4856) Add MapState for keyed streams

2016-11-06 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642868#comment-15642868
 ] 

Xiaogang Shi edited comment on FLINK-4856 at 11/7/16 2:29 AM:
--

I have started the implementation of MapStates. But at prior to that, I think 
we need some modification to current implementation to clarify the concepts. I 
have started two JIRA to state these problems. You may see FLINK-5023 and 
FLINK-5024 for the details.  


was (Author: xiaogang.shi):
I have started the implementation of MapStates. But at prior to that, I think 
we need some modification to currently framework to clarify the concepts. I 
have started two JIRA to state these problems. You may see FLINK-5023 and 
FLINK-5024 for the details.  

> Add MapState for keyed streams
> --
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4856) Add MapState for keyed streams

2016-11-06 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15642868#comment-15642868
 ] 

Xiaogang Shi commented on FLINK-4856:
-

I have started the implementation of MapStates. But at prior to that, I think 
we need some modification to currently framework to clarify the concepts. I 
have started two JIRA to state these problems. You may see FLINK-5023 and 
FLINK-5024 for the details.  

> Add MapState for keyed streams
> --
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts

2016-11-06 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5024:
---

 Summary: Add SimpleStateDescriptor to clarify the concepts
 Key: FLINK-5024
 URL: https://issues.apache.org/jira/browse/FLINK-5024
 Project: Flink
  Issue Type: Improvement
Reporter: Xiaogang Shi


Currently, StateDescriptors accept two type arguments : the first one is the 
type of the created state and the second one is the type of the values in the 
states. 

The concepts however is a little confusing here because in ListStates, the 
arguments passed to the StateDescriptors are the types of the list elements 
instead of the lists. It also makes the implementation of MapStates difficult.

I suggest not to put the type serializer in StateDescriptors, making 
StateDescriptors independent of the data structures of the values. 

A new type of StateDescriptor named SimpleStateDescriptor can be provided to 
abstract those states (namely ValueState, ReducingState and FoldingState) whose 
states are not composited. 

The states (e.g. ListStates and MapStates) can implement their own descriptors 
according to their data structures. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5023) Add get() method in State interface

2016-11-06 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-5023:
---

 Summary: Add get() method in State interface
 Key: FLINK-5023
 URL: https://issues.apache.org/jira/browse/FLINK-5023
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Xiaogang Shi


Currently, the only method provided by the State interface is `clear()`. I 
think we should provide another method called `get()` to return the structured 
value (e.g., value, list, or map) under the current key. 

In fact, the functionality of `get()` has already been implemented in all types 
of states: e.g., `value()` in ValueState and `get()` in ListState. The 
modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4856) Add MapState for keyed streams

2016-10-19 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi reassigned FLINK-4856:
---

Assignee: Xiaogang Shi

> Add MapState for keyed streams
> --
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4856) Add MapState for keyed streams

2016-10-18 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-4856:
---

 Summary: Add MapState for keyed streams
 Key: FLINK-4856
 URL: https://issues.apache.org/jira/browse/FLINK-4856
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Reporter: Xiaogang Shi


Many states in keyed streams are organized as key-value pairs. Currently, these 
states are implemented by storing the entire map into a ValueState or a 
ListState. The implementation however is very costly because all entries have 
to be serialized/deserialized when updating a single entry. To improve the 
efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4448) Use Listeners to monitor execution status

2016-08-23 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi closed FLINK-4448.
---
Resolution: Fixed

The functionality will be implemented in Flink-4457

https://issues.apache.org/jira/browse/FLINK-4457

> Use Listeners to monitor execution status
> -
>
> Key: FLINK-4448
> URL: https://issues.apache.org/jira/browse/FLINK-4448
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, JobMaster monitors the ExecutionGraph's job status and execution 
> state through Akka. Since the dependencies on Akka should be removed in the 
> refactoring, JobMaster will utilize JobStatusListener and 
> ExecutionStateListener to receive the notifications from ExecutionGraph.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4408) Submit Job and setup ExecutionGraph

2016-08-23 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434052#comment-15434052
 ] 

Xiaogang Shi commented on FLINK-4408:
-

I think it's okay to pull the logics about leadership out of the JobManager and 
let the component holding the JobManager to take care of it.

But I think the JobManager still needs to worry about the switches.  The 
JobManager still need the function to submit/execute the job when it is started 
by the component holding it (to be specific, the `submitJob` function in old 
JobManager).  The only difference made by pulling the logics out is that the 
function will be called by the component holding the JobManager but not the 
JobManager itself. 

If I understand your comment correctly, i think we should revise the 
implementation in Flink-4400 to let JobManager not care about leadership. But 
the JobManager still needs to implement the methods to start and cancel the 
execution, which are implemented in this JIRA.

> Submit Job and setup ExecutionGraph
> ---
>
> Key: FLINK-4408
> URL: https://issues.apache.org/jira/browse/FLINK-4408
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Once granted the leadership, JM will start to execute the job.
> Most code remains the same except that 
> (1) In old implementation where JM manages the execution of multiple jobs, JM 
> has to load all submitted JobGraphs from SubmittedJobGraphStore and recover 
> them. Now that the components creating JM will be responsible for the 
> recovery of JobGraphs, JM will be created with submitted/recovered JobGraph, 
> without the need to load the JobGraph.
> (2) JM should not rely on Akka to listen on the updates of JobStatus and 
> Execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4448) Use Listeners to monitor execution status

2016-08-22 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-4448:
---

 Summary: Use Listeners to monitor execution status
 Key: FLINK-4448
 URL: https://issues.apache.org/jira/browse/FLINK-4448
 Project: Flink
  Issue Type: Sub-task
  Components: Cluster Management
Reporter: Xiaogang Shi
Assignee: Xiaogang Shi


Currently, JobMaster monitors the ExecutionGraph's job status and execution 
state through Akka. Since the dependencies on Akka should be removed in the 
refactoring, JobMaster will utilize JobStatusListener and 
ExecutionStateListener to receive the notifications from ExecutionGraph.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4408) Submit Job and setup ExecutionGraph

2016-08-17 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi updated FLINK-4408:

Summary: Submit Job and setup ExecutionGraph  (was: JobSubmission)

> Submit Job and setup ExecutionGraph
> ---
>
> Key: FLINK-4408
> URL: https://issues.apache.org/jira/browse/FLINK-4408
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Once granted the leadership, JM will start to execute the job.
> Most code remains the same except that 
> (1) In old implementation where JM manages the execution of multiple jobs, JM 
> has to load all submitted JobGraphs from SubmittedJobGraphStore and recover 
> them. Now that the components creating JM will be responsible for the 
> recovery of JobGraphs, JM will be created with submitted/recovered JobGraph, 
> without the need to load the JobGraph.
> (2) JM should not rely on Akka to listen on the updates of JobStatus and 
> Execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4408) JobSubmission

2016-08-17 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-4408:
---

 Summary: JobSubmission
 Key: FLINK-4408
 URL: https://issues.apache.org/jira/browse/FLINK-4408
 Project: Flink
  Issue Type: Sub-task
  Components: Cluster Management
Reporter: Xiaogang Shi
Assignee: Xiaogang Shi


Once granted the leadership, JM will start to execute the job.

Most code remains the same except that 
(1) In old implementation where JM manages the execution of multiple jobs, JM 
has to load all submitted JobGraphs from SubmittedJobGraphStore and recover 
them. Now that the components creating JM will be responsible for the recovery 
of JobGraphs, JM will be created with submitted/recovered JobGraph, without the 
need to load the JobGraph.

(2) JM should not rely on Akka to listen on the updates of JobStatus and 
Execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4400) Leadership Election among JobManagers

2016-08-16 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-4400:
---

 Summary: Leadership Election among JobManagers
 Key: FLINK-4400
 URL: https://issues.apache.org/jira/browse/FLINK-4400
 Project: Flink
  Issue Type: Sub-task
  Components: Cluster Management
Reporter: Xiaogang Shi
Assignee: Xiaogang Shi


* All JobMasters are LeaderContenders
* Once a JobMaster is initialized, the very first thing it has to do is to 
start the leadership election service and contend for the leadership.
* A JobMaster starts to perform its functionality when it grants the leadership.
* If a JobMaster’s leadership is revoked, it will cancel all performed 
execution and release all acquired resources.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)