[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-12-05 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
well..major runtime changes are coming with FLIP-6, 15 and 16 so I would 
suggest you watch those. Loop FT will be included in one of these along with 
other loop redesign features.


---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-11-26 Thread dikei
Github user dikei commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hi. Do we have any updates on this :)


---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-03-27 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
sweet! thanks @StefanRRichter 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-03-27 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/1668
  
For raw operator state, override 
`AbstractStreamOperator::snapshotState(StateSnapshotContext context)` inside 
your operator.  Your implementation calls to super, then it can obtain the raw 
stream via `context.getRawOperatorStateOutput()`. This stream works like a 
normal output stream, except that you can also call 
`stream.startNewPartition()`. This signals that a partition is started and 
previous partitions are finalized/immutable. Partitions are the atomic units of 
state redistribution, think of them as the indiviual elements in a 
`ListCheckpointed` state.

For restoring, override 
`AbstractStreamOperator::initializeState(StateInitializationContext context)`. 
After calling super, `context.getRawOperatorStateInputs()` provides an iterable 
with one input stream per partition that your operator should restore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-03-27 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Thanks for the review @gyfora and @StephanEwen , these are very good points.

@StephanEwen makes sense to not really index/keep metadata of individual 
records in log slices, it is extra overhead. Writing raw operator state makes 
sense, so I will do that once @StefanRRichter  gives me some pointers, that 
would be great. 

Any redistribution of the checkpoint slices would violate causality so I 
hope the "list redistribution pattern" actually keeps the set of registered 
operator states per instance intact. The garbage collection issue still remains 
but maybe (if @StefanRRichter approves) I can add an `unregister` functionality 
to the `OperatorStateStore`.

I can also add preconfigured operators (not that they will be reused 
anywhere). It is more clean but I really need to see how can I get full control 
of the `task` checkpointing behaviour from the `operator` level (since the 
default task checkpointing behaviour is altered at the task-level).




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-03-24 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hey. Any update/opinion/something anyone?
Just a gentle reminder, sorry if this sounds a bit desperate :)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-03-15 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
I have just rebased to the current master.
Please let me wrap this up. It has been over a year :)

Unregistering state in the OperatorStateStore is very tiny fix.
@StephanEwen @StefanRRichter Is it ok with you to make this small addition 
in this PR or should I create a separate issue? Hope you have a spare few 
minutes to take a quick look this time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-03-10 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hey @addisonj. 
Sure! You could perhaps review the changes and maybe see how to discard 
empty operator states if you are motivated. This is the only pending issue for 
this PR. thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-03-09 Thread addisonj
Github user addisonj commented on the issue:

https://github.com/apache/flink/pull/1668
  
Very interested in this work. It sounds like there are few loose ends and 
then some cleanup before it might be ready for merge, @senorcarbone or 
@StephanEwen anything that can be supported by someone else? Would love to help 
wherever possible


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-01-19 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
The last update implements a variant of what @StephanEwen proposes. We have 
put some more thought on this offline too thanks to @gyfora ! The idea is that 
instead of putting records to each `ListState`, the output log is partitioned 
into multiple log "slices", one per concurrent checkpoint.

More specifically, the `UpstreamLogger` operator at the `IterationHead` 
slices logs proportionally to the number of concurrent snapshots. This also 
allows committed output logs to be uniquely identified and cleared after each 
complete checkpoint. The design is based on the following assumptions:

- A slice is named after a checkpoint ID. Checkpoint IDs are numerically 
ordered within an execution.
- Each checkpoint barrier arrives back in FIFO order, thus we discard log 
slices in respective FIFO order.
 - Upon restoration the logger sorts sliced logs in the same FIFO order and 
returns an Iterable that gives a singular view of the log.

Before I polish this we need to close a memory leak. The `clear` operation 
of `State` cleans the state under the registered id but it does not seem to 
unregister the key itself. Does anyone have an idea on how to unregister state 
properly? Hope this gets some attention to wrap it up, it's been too long :). 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-23 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
@StephanEwen could you check my question above? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-16 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Working on it atm . I decided to make the following optimisations but want 
to very quickly make sure that async checkpointing works the way I believe it 
does:
- Most importantly, I am changing the iteration head to always forward 
records. Their effects are not present in any in-progress snapshot anyway so 
that I should had done from the very beginning. :)
- If `ListState` is checkpointed asynchronously, depending on the backend I 
suppose, then the current version of it, during the snapshot, will be persisted 
as a copy, which means that we can apply mutations right away and therefore 
reset it right after invoking the snapshot to the beginning of the next 
in-progress snapshot (some indexing involved). That way we do not need to open 
new ListStates in the first place. Does this make sense?

@StephanEwen Please correct me if I am wrong, regarding the second point. I 
am just not very familiar with async snapshotting for `ListState` (this is not 
clear in the documentation for me). Mind also that I do not use the 
`CheckpointedAsychronously` interface, it seems to be heading towards 
deprecation. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-13 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
agreed @StephanEwen! I will do that. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1668
  
@senorcarbone I agree, let's fix the multiple checkpoints issue and do the 
rest in FLIP-15

The other operators have a pretty simply way of doing this:
  - for synchronous checkpointed operators, no need to do anything, the 
synchronous part of one checkpoint is over when the next starts (because it is 
synchronous ;-))
  - for asynchronously checkpointed state, the state backend needs to be 
able to hold multiple snapshots, which are saved by multiple background threads
  - none of the operators deal with in-flight data, which makes their job 
easy

Dealing with in-flight data probably means that you need to open a 
ListState for each checkpoint that arrives and add the feed back values to each 
state, until that particular checkpoints barrier comes back through the 
feedback channel. I think that should be sufficient.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-13 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
These are some good points @StephanEwen, thanks for checking it. 
How about the following, regarding each issue:

- `Concurrent Checkpoints`: Looks like an improvement but I can sure do it 
in this PR if  it is a crucial one. Can you elaborate a bit more or point me 
out to other concurrent checkpointing operator state examples to get an idea of 
how you want to do it?
- `Reconfiguration` : Sounds interesting...but I am not really aware of it 
from the devlist. If it is simple enough I could add support for it here. 
Otherwise I would suggest we address this in a seperate JIRA and PR as an 
improvement. Is there a design document on how we plan to achieve 
reconfiguration and repartitioning for operator state specifically somewhere?
- `At-most-once blocking queue` : It is obvious from my previous comments 
that I do not approve this part, but that is something we already got rid of in 
[FLIP-15](https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination)
 already 
([this](https://github.com/FouadMA/flink/commit/9adaac435bcaf3552afe564c739d4e8fd79c433b)
 commit). How about we address this together with the deadlocks in FLIP-15?
- `Deadlocks`: I like the elastic spilling channel idea to resolve 
deadlocks. I need time to dig a bit more into this and make sure we solve 
deadlocks and not just improve. Is it ok with you if we address that in 
[FLIP-15](https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination)?
 I need more time for this part, plus, we need to combine the absense of 
expiring queues with a proper termination algorithm (otherwise we just solve 
the deadlocks and the jobs never terminate).

What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-12 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1668
  
To suggest some way to fix the guarantees: To my mind, the crux lies in the 
way that the feedback channel is implemented - a simple blocking queue just 
does not cut it for that case. To make this proper, I think we need to do the 
following:
  - Have an elastic feedback channel (unbounded) with a certain memory 
budget, that can spill if needed. I think it would be best implemented holding 
data serialized.
  - On checkpoint, one simply adds the feedback channel data (already 
bytes) to the checkpoint
  - The source task should probably prioritize reading from the feedback 
channel, to keep it always as small as possible.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-12 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1668
  
Thanks for the reminder, I went over the code today. The code looks mostly 
good, but here are some thoughts:

  - The head task supports only one concurrent checkpoint. In general, the 
tasks need to support multiple checkpoints being in progress at the same time. 
It frequently happens when people trigger savepoints concurrent to a running 
checkpoint. I think that is important to support.

  - There tail task offers the elements to the blocking queue. That means 
records are simply dropped if the capacity bound queue (one element) is not 
polled by the head task in time.

  - With the capacity bound in the feedback queue, it is pretty easy to 
build a full deadlock. Just use a loop function that explodes data into the 
feedback channel.

  - Recent code also introduced the ability to change parallelism. What are 
the semantics here when the parallelism of the loop is changed?

Since loops did not support any fault tolerance guarantees, I guess this 
does improve recovery behavior. But as long as the loops can either deadlock or 
drop data, the hard guarantees are in the end still a bit weak. So that leaves 
me a bit ambivalent what to do with this pull request.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-12 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Exactly, these two issues do not depend on each other.  No doubt loop FT is 
the first thing that can enable iterations in a production deployment so I 
would merge that first.

Thank you again Gyula for looking into it :)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-12 Thread gyfora
Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/1668
  
I think the PR looks pretty good, and it sounds fair to to address 
termination in a later PR as this will still greatly improve the current 
guarantees without making the backpressure/termination problems any worse.

+1 from me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-12 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hey again, @StephanEwen @uce .
When you have 10 min can you take a look to see if this is acceptable?  
I would not like to leave this here for months again, it has been out way 
too long already. 

The changes are just a few and straightforward so I really encourage you to 
skim them at your earliest convenience. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-11-22 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Ok, so I am progressing this a bit independently from the termination stuff 
and then we rebase to the first PR that is merged. I just changed everything 
and rebased to the current master. 

Some notable changes:
- The `StreamIterationCheckpointingITCase` is not made deterministic, it 
fails after the first successful checkpoint once and the jobs stops after 
everything has been recovered appropriately.
- I am now using ListState which is supposed to work like a charm with the 
rocksdb file backend. Note that with the default in-memory backend there is a 
high chance to get issues given the low memory capacity that it is given by 
default.
- One tricky part that can be potentially done better is the way I set the 
logger in the StreamIterationHead (had to change the head op field access to 
`protected` in the OperatorChain)

Whenever you find time go ahead and check it out. It passes my super-strict 
test which is a good thing. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-09-28 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hey! Good to be back :) . Let's fix this properly, as @StephanEwen 
recommended it now that there is some time.
We are writing together with @FouadMA a FLIP to address major loop fixes. 
Namely, termination determination and fault tolerance. The termination 
implementation is already in a good shape in my opinion and you can find it 
[here](https://github.com/senorcarbone/flink/pull/2#pullrequestreview-1929918) 
so you want to take an early look. The description in the FLIP will make clear 
of how this works in detail. 

The FT update for loops will be rebase on top of the loop termination fix.
We hope that you will find this good too and btw thanks for your patience :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---