Re: [VOTE] Accept Stateful Functions into Apache Flink

2019-11-03 Thread Paris Carbone
+1 from me as well!
Looking forward to seeing stateful functions evolving within Flink.

> On 2 Nov 2019, at 18:00, Márton Balassi  wrote:
> 
> +1 (binding)
> 
> Thank you for proposing this contribution!
> 
> On Fri, Nov 1, 2019 at 2:46 PM Konstantin Knauf 
> wrote:
> 
>> +1 (non-binding)
>> 
>> Stateful Functions, already in its current initial release, simplifies the
>> development of event-driven application on Flink quite significantly.
>> 
>> On Thu, Oct 31, 2019 at 9:24 AM Vijay Bhaskar 
>> wrote:
>> 
>>> +1 from me
>>> 
>>> Regards
>>> Bhaskar
>>> 
>>> On Thu, Oct 31, 2019 at 11:42 AM Gyula Fóra 
>> wrote:
>>> 
 +1 from me, this is a great addition to Flink!
 
 Gyula
 
 On Thu, Oct 31, 2019, 03:52 Yun Gao 
>>> wrote:
 
>+1 (non-binding)
>Very thanks for bringing this to the community!
> 
> 
> --
> From:jincheng sun 
> Send Time:2019 Oct. 31 (Thu.) 10:22
> To:dev 
> Cc:Vasiliki Kalavri 
> Subject:Re: [VOTE] Accept Stateful Functions into Apache Flink
> 
> big +1 (binding)
> 
> Andrey Zagrebin 于2019年10月30日 周三23:45写道:
> 
>> sorry, my +1 was non-binding, confused that it was not a committer
>>> vote
> but
>> PMC.
>> 
>> On Wed, Oct 30, 2019 at 4:43 PM Chesnay Schepler <
>> ches...@apache.org
 
>> wrote:
>> 
>>> +1 (binding)
>>> 
>>> On 30/10/2019 15:25, Vasiliki Kalavri wrote:
 +1 (binding) from me. I hope this is not too late :)
 
 Thank you for this great contribution!
 
 On Wed, 30 Oct 2019 at 14:45, Stephan Ewen 
 wrote:
 
> Thank you all for voting.
> 
> The voting period has passed, but only 13 PMC members have
>> voted
 so
>> far,
> that is less than 2/3rd of the PMCs (17 members).
> 
> I will take a few days to ping other members to vote, after
>> that
 we
>> will
> gradually lower the threshold as per the process to account
>> for
>> inactive
> members.
> 
> Best,
> Stephan
> 
> 
> 
> 
> On Tue, Oct 29, 2019 at 6:20 PM Seth Wiesman <
>>> sjwies...@gmail.com
> 
>>> wrote:
> 
>> +1 (non-binding)
>> 
>> Seth
>> 
>>> On Oct 23, 2019, at 9:31 PM, Jingsong Li <
 jingsongl...@gmail.com>
> wrote:
>>> +1 (non-binding)
>>> 
>>> Best,
>>> Jingsong Lee
>>> 
 On Wed, Oct 23, 2019 at 9:02 PM Yu Li 
 wrote:
 
 +1 (non-binding)
 
 Best Regards,
 Yu
 
 
> On Wed, 23 Oct 2019 at 16:56, Haibo Sun <
>> sunhaib...@163.com
 
>> wrote:
> 
> +1 (non-binding)Best,
> Haibo
> 
> 
> At 2019-10-23 09:07:41, "Becket Qin" <
>> becket@gmail.com>
>> wrote:
>> +1 (binding)
>> 
>> Thanks,
>> 
>> Jiangjie (Becket) Qin
>> 
>> On Tue, Oct 22, 2019 at 11:44 PM Tzu-Li (Gordon) Tai <
 tzuli...@apache.org
>> wrote:
>> 
>>> +1 (binding)
>>> 
>>> Gordon
>>> 
>>> On Tue, Oct 22, 2019, 10:58 PM Zhijiang <
> wangzhijiang...@aliyun.com
>>> .invalid>
>>> wrote:
>>> 
 +1 (non-binding)
 
 Best,
 Zhijiang
 
 
 
>>> 
>> --
 From:Zhu Zhu 
 Send Time:2019 Oct. 22 (Tue.) 16:33
 To:dev 
 Subject:Re: [VOTE] Accept Stateful Functions into
>> Apache
> Flink
 
 +1 (non-binding)
 
 Thanks,
 Zhu Zhu
 
 Biao Liu  于2019年10月22日周二
>> 上午11:06写道:
 
> +1 (non-binding)
> 
> Thanks,
> Biao /'bɪ.aʊ/
> 
> 
> 
>> On Tue, 22 Oct 2019 at 10:26, Jark Wu <
>>> imj...@gmail.com>
>>> wrote:
>> 
>> +1 (non-binding)
>> 
>> Best,
>> Jark
>> 
>> On Tue, 22 Oct 2019 at 09:38, Hequn Cheng <
>>> chenghe...@gmail.com
 wrote:
>>> +1 (non-binding)
>>> 
>>> Best, Hequn
>>> 
>>> On Tue, Oct 22, 2019 at 9:21 AM Dian Fu <
 dian0511...@gmail.com>

Re: Checkpointing under backpressure

2019-08-14 Thread Paris Carbone
Sure I see. In cases when no periodic aligned snapshots are employed this is 
the only option.

Two things that were not highlighted enough so far on the proposed protocol 
(included my mails):
- The Recovery/Reconfiguration strategy should strictly prioritise 
processing logged events before entering normal task input operation. Otherwise 
causality can be violated. This also means dataflow recovery will be expected 
to be slower to the one employed on an aligned snapshot.
- Same as with state capture, markers should be forwarded upon first 
marker received on input. No later than that. Otherwise we have duplicate side 
effects.

Thanks for the great ideas so far.

Paris

> On 14 Aug 2019, at 14:33, Stephan Ewen  wrote:
> 
> Scaling with unaligned checkpoints might be a necessity.
> 
> Let's assume the job failed due to a lost TaskManager, but no new
> TaskManager becomes available.
> In that case we need to scale down based on the latest complete checkpoint,
> because we cannot produce a new checkpoint.
> 
> 
> On Wed, Aug 14, 2019 at 2:05 PM Paris Carbone 
> wrote:
> 
>> +1 I think we are on the same page Stephan.
>> 
>> Rescaling on unaligned checkpoint sounds challenging and a bit
>> unnecessary. No?
>> Why not sticking to aligned snapshots for live reconfiguration/rescaling?
>> It’s a pretty rare operation and it would simplify things by a lot.
>> Everything can be “staged” upon alignment including replacing channels and
>> tasks.
>> 
>> -Paris
>> 
>>> On 14 Aug 2019, at 13:39, Stephan Ewen  wrote:
>>> 
>>> Hi all!
>>> 
>>> Yes, the first proposal of "unaligend checkpoints" (probably two years
>> back
>>> now) drew a major inspiration from Chandy Lamport, as did actually the
>>> original checkpointing algorithm.
>>> 
>>> "Logging data between first and last barrier" versus "barrier jumping
>> over
>>> buffer and storing those buffers" is pretty close same.
>>> However, there are a few nice benefits of the proposal of unaligned
>>> checkpoints over Chandy-Lamport.
>>> 
>>> *## Benefits of Unaligned Checkpoints*
>>> 
>>> (1) It is very similar to the original algorithm (can be seen an an
>>> optional feature purely in the network stack) and thus can share lot's of
>>> code paths.
>>> 
>>> (2) Less data stored. If we make the "jump over buffers" part timeout
>> based
>>> (for example barrier overtakes buffers if not flushed within 10ms) then
>>> checkpoints are in the common case of flowing pipelines aligned without
>>> in-flight data. Only back pressured cases store some in-flight data,
>> which
>>> means we don't regress in the common case and only fix the back pressure
>>> case.
>>> 
>>> (3) Faster checkpoints. Chandy Lamport still waits for all barriers to
>>> arrive naturally, logging on the way. If data processing is slow, this
>> can
>>> still take quite a while.
>>> 
>>> ==> I think both these points are strong reasons to not change the
>>> mechanism away from "trigger sources" and start with CL-style "trigger
>> all".
>>> 
>>> 
>>> *## Possible ways to combine Chandy Lamport and Unaligned Checkpoints*
>>> 
>>> We can think about something like "take state snapshot on first barrier"
>>> and then store buffers until the other barriers arrive. Inside the
>> network
>>> stack, barriers could still overtake and persist buffers.
>>> The benefit would be less latency increase in the channels which already
>>> have received barriers.
>>> However, as mentioned before, not prioritizing the inputs from which
>>> barriers are still missing can also have an adverse effect.
>>> 
>>> 
>>> *## Concerning upgrades*
>>> 
>>> I think it is a fair restriction to say that upgrades need to happen on
>>> aligned checkpoints. It is a rare enough operation.
>>> 
>>> 
>>> *## Concerning re-scaling (changing parallelism)*
>>> 
>>> We need to support that on unaligned checkpoints as well. There are
>> several
>>> feature proposals about automatic scaling, especially down scaling in
>> case
>>> of missing resources. The last snapshot might be a regular checkpoint, so
>>> all checkpoints need to support rescaling.
>>> 
>>> 
>>> *## Concerning end-to-end checkpoint duration and "trigger sources"
>> versus
>>> "trigg

Re: Checkpointing under backpressure

2019-08-14 Thread Paris Carbone
bout this more. Maybe as Paris is writing, we do not
>> need to block any channels at all, at least assuming credit base flow
>> control. Regarding what should happen with the following checkpoint is
>> another question. Also, should we support concurrent checkpoints and
>> subsuming checkpoints as we do now? Maybe not…
>> 
>> Paris
>> 
>> Re
>> I. 2. a) and b) - yes, this would have to be taken into an account
>> I. 2. c) and IV. 2. - without those, end to end checkpoint time will
>> probably be longer than it could be. It might affect external systems. For
>> example Kafka, which automatically time outs lingering transactions, and
>> for us, the transaction time is equal to the time between two checkpoints.
>> 
>> II 1. - I’m confused. To make things straight. Flink is currently
>> snapshotting once it receives all of the checkpoint barriers from all of
>> the input channels and only then it broadcasts the checkpoint barrier down
>> the stream. And this is correct from exactly-once perspective.
>> 
>> As far as I understand, your proposal based on Chandy Lamport algorithm,
>> is snapshotting the state of the operator on the first checkpoint barrier,
>> which also looks correct to me.
>> 
>> III. 1. As I responded to Zhu Zhu, let me think a bit more about this.
>> 
>> V. Yes, we still need aligned checkpoints, as they are easier for state
>> migration and upgrades.
>> 
>> Piotrek
>> 
>>> On 14 Aug 2019, at 11:22, Paris Carbone  wrote:
>>> 
>>> Now I see a little more clearly what you have in mind. Thanks for the
>> explanation!
>>> There are a few intermixed concepts here, some how to do with
>> correctness some with performance.
>>> Before delving deeper I will just enumerate a few things to make myself
>> a little more helpful if I can.
>>> 
>>> I. Initiation
>>> -
>>> 
>>> 1. RPC to sources only is a less intrusive way to initiate snapshots
>> since you utilize better pipeline parallelism (only a small subset of tasks
>> is running progressively the protocol at a time, if snapshotting is async
>> the overall overhead might not even be observable).
>>> 
>>> 2. If we really want an RPC to all initiation take notice of the
>> following implications:
>>> 
>>>  a. (correctness) RPC calls are not guaranteed to arrive in every
>> task before a marker from a preceding task.
>>> 
>>>  b. (correctness) Either the RPC call OR the first arriving marker
>> should initiate the algorithm. Whichever comes first. If you only do it per
>> RPC call then you capture a "late" state that includes side effects of
>> already logged events.
>>> 
>>>  c. (performance) Lots of IO will be invoked at the same time on
>> the backend store from all tasks. This might lead to high congestion in
>> async snapshots.
>>> 
>>> II. Capturing State First
>>> -
>>> 
>>> 1. (correctness) Capturing state at the last marker sounds incorrect to
>> me (state contains side effects of already logged events based on the
>> proposed scheme). This results into duplicate processing. No?
>>> 
>>> III. Channel Blocking / "Alignment"
>>> ---
>>> 
>>> 1. (performance?) What is the added benefit? We dont want a "complete"
>> transactional snapshot, async snapshots are purely for failure-recovery.
>> Thus, I dont see why this needs to be imposed at the expense of
>> performance/throughput. With the proposed scheme the whole dataflow anyway
>> enters snapshotting/logging mode so tasks more or less snapshot
>> concurrently.
>>> 
>>> IV Marker Bypassing
>>> ---
>>> 
>>> 1. (correctness) This leads to equivalent in-flight snapshots so with
>> some quick thinking  correct. I will try to model this later and get back
>> to you in case I find something wrong.
>>> 
>>> 2. (performance) It also sounds like a meaningful optimisation! I like
>> thinking of this as a push-based snapshot. i.e., the producing task somehow
>> triggers forward a consumer/channel to capture its state. By example
>> consider T1 -> |marker t1| -> T2.
>>> 
>>> V. Usage of "Async" Snapshots
>>> -
>>> 
>>> 1. Do you see this as a full replacement of "full" aligned
>> snapshots/savepoints? In my view async shanpshots will be needed from time
>>

Re: Checkpointing under backpressure

2019-08-14 Thread Paris Carbone
Thanks for the responses. Starts getting a bit more clear for everyone now. 

@Zhuzhu overlapping unaligned snapshots should be aborted/avoided imho.
@Piotr point II, it was a little too quickly written, sorry about that.
Simply put the two following approaches are equivalent for a valid checkpoint. 

1. A -> S -> F
2. S -> F -> L

Other variants introduce issues. For example:

3. L -> S (duplicate side effects)
4. L -> A -> S  (again duplicates)
5. S -> L -> A  (valid but unnecessary channel blocking)

Abbreviations
—

S: Capture state
L: Log pending channels
F: Forward Marker
A: Align markers by blocking non-pending channels

What I like about the Chandy Lamport approach (2.) initiated from sources is 
that:
- Snapshotting imposes no modification to normal processing. 
- Can exploit pipeline parallelism if started from the sources, 
following the natural processing order without concurrent operations to 
backends (think of long pipelines, e.g., 30 operators with high parallelism, 
e.g. 100)

I would love to see a design doc soon and discuss ideas there instead. Might 
not seem like it but I don’t fancy writing long and boring emails. :)

-Paris

> On 14 Aug 2019, at 13:10, Yun Gao  wrote:
> 
>Hi,
>   Very thanks for sharing the thoughts on the unaligned checkpoint !
>
>  Another question regarding I 2.C (Performance) by Paris is that do we 
> always snapshot and broadcast the marks once the task receives the first mark 
> from JM o? If so, then we will always need to snapshot all the records before 
> the next barriers in all the input channels . However, I thinks users may be 
> able to tolerate a fixed interval for the checkpointing, and we may postpone 
> the snapshot and broadcast till a configurable fixed time passed. With such a 
> postpone, jobs without back pressure could still avoid most IO operations and 
> storage overhead, and jobs with back pressure could also be able to finish 
> the checkpoint in a fixed interval with less IO overhead.  
> 
> Best, 
> Yun
> 
> --
> From:Piotr Nowojski 
> Send Time:2019 Aug. 14 (Wed.) 18:38
> To:Paris Carbone 
> Cc:dev ; zhijiang ; Nico 
> Kruber 
> Subject:Re: Checkpointing under backpressure
> 
> Hi again,
> 
> Zhu Zhu let me think about this more. Maybe as Paris is writing, we do not 
> need to block any channels at all, at least assuming credit base flow 
> control. Regarding what should happen with the following checkpoint is 
> another question. Also, should we support concurrent checkpoints and 
> subsuming checkpoints as we do now? Maybe not…
> 
> Paris
> 
> Re 
> I. 2. a) and b) - yes, this would have to be taken into an account
> I. 2. c) and IV. 2. - without those, end to end checkpoint time will probably 
> be longer than it could be. It might affect external systems. For example 
> Kafka, which automatically time outs lingering transactions, and for us, the 
> transaction time is equal to the time between two checkpoints.
> 
> II 1. - I’m confused. To make things straight. Flink is currently 
> snapshotting once it receives all of the checkpoint barriers from all of the 
> input channels and only then it broadcasts the checkpoint barrier down the 
> stream. And this is correct from exactly-once perspective. 
> 
> As far as I understand, your proposal based on Chandy Lamport algorithm, is 
> snapshotting the state of the operator on the first checkpoint barrier, which 
> also looks correct to me.
> 
> III. 1. As I responded to Zhu Zhu, let me think a bit more about this.
> 
> V. Yes, we still need aligned checkpoints, as they are easier for state 
> migration and upgrades. 
> 
> Piotrek
> 
> > On 14 Aug 2019, at 11:22, Paris Carbone  wrote:
> > 
> > Now I see a little more clearly what you have in mind. Thanks for the 
> > explanation!
> > There are a few intermixed concepts here, some how to do with correctness 
> > some with performance.
> > Before delving deeper I will just enumerate a few things to make myself a 
> > little more helpful if I can.
> > 
> > I. Initiation
> > -
> > 
> > 1. RPC to sources only is a less intrusive way to initiate snapshots since 
> > you utilize better pipeline parallelism (only a small subset of tasks is 
> > running progressively the protocol at a time, if snapshotting is async the 
> > overall overhead might not even be observable).
> > 
> > 2. If we really want an RPC to all initiation take notice of the following 
> > implications:
> >  
> >  a. (correctness) RPC calls are not guaranteed to arrive in every task 
> > before a marker from a preceding t

Re: Checkpointing under backpressure

2019-08-14 Thread Paris Carbone
 starts logging all events of I2 (only) into a buffer M*
>>> - Also notice here that T does NOT block I1 as it does in aligned
>>> snapshots -
>>> 3) Eventually T receives barrier from I2 and stops recording events. Its
>>> asynchronously captured snapshot is now complete: {T*,M*}.
>>> Upon recovery all messages of M* should be replayed in FIFO order.
>>> 
>>> With this approach alignment does not create a deadlock situation since
>>> anyway 2.II happens asynchronously and messages can be logged as well
>>> asynchronously during the process of the snapshot. If there is
>>> back-pressure in a pipeline the cause is most probably not this algorithm.
>>> 
>>> Back to your observation, the answer : yes and no.  In your network model,
>>> I can see the logic of “logging” and “committing” a final snapshot being
>>> provided by the channel implementation. However, do mind that the first
>>> barrier always needs to go “all the way” to initiate the Chandy Lamport
>>> algorithm logic.
>>> 
>>> The above flow has been proven using temporal logic in my phd thesis in
>>> case you are interested about the proof.
>>> I hope this helps a little clarifying things. Let me know if there is any
>>> confusing point to disambiguate. I would be more than happy to help if I
>>> can.
>>> 
>>> Paris
>>> 
>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski  wrote:
>>>> 
>>>> Thanks for the input. Regarding the Chandy-Lamport snapshots don’t you
>>> still have to wait for the “checkpoint barrier” to arrive in order to know
>>> when have you already received all possible messages from the upstream
>>> tasks/operators? So instead of processing the “in flight” messages (as the
>>> Flink is doing currently), you are sending them to an “observer”?
>>>> 
>>>> In that case, that’s sounds similar to “checkpoint barriers overtaking
>>> in flight records” (aka unaligned checkpoints). Just for us, the observer
>>> is a snapshot state.
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 13 Aug 2019, at 13:14, Paris Carbone 
>>> wrote:
>>>>> 
>>>>> Interesting problem! Thanks for bringing it up Thomas.
>>>>> 
>>>>> Ignore/Correct me if I am wrong but I believe Chandy-Lamport snapshots
>>> [1] would help out solve this problem more elegantly without sacrificing
>>> correctness.
>>>>> - They do not need alignment, only (async) logging for in-flight
>>> records between the time the first barrier is processed until the last
>>> barrier arrives in a task.
>>>>> - They work fine for failure recovery as long as logged records are
>>> replayed on startup.
>>>>> 
>>>>> Flink’s “alligned” savepoints would probably be still necessary for
>>> transactional sink commits + any sort of reconfiguration (e.g., rescaling,
>>> updating the logic of operators to evolve an application etc.).
>>>>> 
>>>>> I don’t completely understand the “overtaking” approach but if you have
>>> a concrete definition I would be happy to check it out and help if I can!
>>>>> Mind that Chandy-Lamport essentially does this by logging things in
>>> pending channels in a task snapshot before the barrier arrives.
>>>>> 
>>>>> -Paris
>>>>> 
>>>>> [1] https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm <
>>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm>
>>>>> 
>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski  wrote:
>>>>>> 
>>>>>> Hi Thomas,
>>>>>> 
>>>>>> As Zhijiang has responded, we are now in the process of discussing how
>>> to address this issue and one of the solution that we are discussing is
>>> exactly what you are proposing: checkpoint barriers overtaking the in
>>> flight data and make the in flight data part of the checkpoint.
>>>>>> 
>>>>>> If everything works well, we will be able to present result of our
>>> discussions on the dev mailing list soon.
>>>>>> 
>>>>>> Piotrek
>>>>>> 
>>>>>>> On 12 Aug 2019, at 23:23, zhijiang 
>>> wrote:
>>>>>>> 
>>>>>>> Hi Thomas,
>>>>>>> 
>>>>>>> Thanks for proposing this concern. The barrier alignme

Re: Checkpointing under backpressure

2019-08-13 Thread Paris Carbone
yes! It’s quite similar I think.  Though mind that the devil is in the details, 
i.e., the temporal order actions are taken.

To clarify, let us say you have a task T with two input channels I1 and I2.
The Chandy Lamport execution flow is the following:

1) T receives barrier from  I1 and...
2)  ...the following three actions happen atomically
I )  T snapshots its state T*
II)  T forwards marker to its outputs
III) T starts logging all events of I2 (only) into a buffer M*
- Also notice here that T does NOT block I1 as it does in aligned snapshots -
3) Eventually T receives barrier from I2 and stops recording events. Its 
asynchronously captured snapshot is now complete: {T*,M*}. 
Upon recovery all messages of M* should be replayed in FIFO order.

With this approach alignment does not create a deadlock situation since anyway 
2.II happens asynchronously and messages can be logged as well asynchronously 
during the process of the snapshot. If there is back-pressure in a pipeline the 
cause is most probably not this algorithm.

Back to your observation, the answer : yes and no.  In your network model, I 
can see the logic of “logging” and “committing” a final snapshot being provided 
by the channel implementation. However, do mind that the first barrier always 
needs to go “all the way” to initiate the Chandy Lamport algorithm logic.

The above flow has been proven using temporal logic in my phd thesis in case 
you are interested about the proof.
I hope this helps a little clarifying things. Let me know if there is any 
confusing point to disambiguate. I would be more than happy to help if I can.

Paris

> On 13 Aug 2019, at 13:28, Piotr Nowojski  wrote:
> 
> Thanks for the input. Regarding the Chandy-Lamport snapshots don’t you still 
> have to wait for the “checkpoint barrier” to arrive in order to know when 
> have you already received all possible messages from the upstream 
> tasks/operators? So instead of processing the “in flight” messages (as the 
> Flink is doing currently), you are sending them to an “observer”?
> 
> In that case, that’s sounds similar to “checkpoint barriers overtaking in 
> flight records” (aka unaligned checkpoints). Just for us, the observer is a 
> snapshot state.
> 
> Piotrek
> 
>> On 13 Aug 2019, at 13:14, Paris Carbone  wrote:
>> 
>> Interesting problem! Thanks for bringing it up Thomas.
>> 
>> Ignore/Correct me if I am wrong but I believe Chandy-Lamport snapshots [1] 
>> would help out solve this problem more elegantly without sacrificing 
>> correctness.
>> - They do not need alignment, only (async) logging for in-flight records 
>> between the time the first barrier is processed until the last barrier 
>> arrives in a task.
>> - They work fine for failure recovery as long as logged records are replayed 
>> on startup.
>> 
>> Flink’s “alligned” savepoints would probably be still necessary for 
>> transactional sink commits + any sort of reconfiguration (e.g., rescaling, 
>> updating the logic of operators to evolve an application etc.).
>> 
>> I don’t completely understand the “overtaking” approach but if you have a 
>> concrete definition I would be happy to check it out and help if I can!
>> Mind that Chandy-Lamport essentially does this by logging things in pending 
>> channels in a task snapshot before the barrier arrives.
>> 
>> -Paris
>> 
>> [1] https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm 
>> <https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm>
>> 
>>> On 13 Aug 2019, at 10:27, Piotr Nowojski  wrote:
>>> 
>>> Hi Thomas,
>>> 
>>> As Zhijiang has responded, we are now in the process of discussing how to 
>>> address this issue and one of the solution that we are discussing is 
>>> exactly what you are proposing: checkpoint barriers overtaking the in 
>>> flight data and make the in flight data part of the checkpoint.
>>> 
>>> If everything works well, we will be able to present result of our 
>>> discussions on the dev mailing list soon. 
>>> 
>>> Piotrek
>>> 
>>>> On 12 Aug 2019, at 23:23, zhijiang  
>>>> wrote:
>>>> 
>>>> Hi Thomas,
>>>> 
>>>> Thanks for proposing this concern. The barrier alignment takes long time 
>>>> in backpressure case which could cause several problems:
>>>> 1. Checkpoint timeout as you mentioned.
>>>> 2. The recovery cost is high once failover, because much data needs to be 
>>>> replayed.
>>>> 3. The delay for commit-based sink is high in exactly-once.
>>>> 
>>>> For credit-based flow contr

Re: Checkpointing under backpressure

2019-08-13 Thread Paris Carbone
Interesting problem! Thanks for bringing it up Thomas.

Ignore/Correct me if I am wrong but I believe Chandy-Lamport snapshots [1] 
would help out solve this problem more elegantly without sacrificing 
correctness.
- They do not need alignment, only (async) logging for in-flight records 
between the time the first barrier is processed until the last barrier arrives 
in a task.
- They work fine for failure recovery as long as logged records are replayed on 
startup.

Flink’s “alligned” savepoints would probably be still necessary for 
transactional sink commits + any sort of reconfiguration (e.g., rescaling, 
updating the logic of operators to evolve an application etc.).

I don’t completely understand the “overtaking” approach but if you have a 
concrete definition I would be happy to check it out and help if I can!
Mind that Chandy-Lamport essentially does this by logging things in pending 
channels in a task snapshot before the barrier arrives.

-Paris

[1] https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm 


> On 13 Aug 2019, at 10:27, Piotr Nowojski  wrote:
> 
> Hi Thomas,
> 
> As Zhijiang has responded, we are now in the process of discussing how to 
> address this issue and one of the solution that we are discussing is exactly 
> what you are proposing: checkpoint barriers overtaking the in flight data and 
> make the in flight data part of the checkpoint.
> 
> If everything works well, we will be able to present result of our 
> discussions on the dev mailing list soon. 
> 
> Piotrek
> 
>> On 12 Aug 2019, at 23:23, zhijiang  
>> wrote:
>> 
>> Hi Thomas,
>> 
>> Thanks for proposing this concern. The barrier alignment takes long time in 
>> backpressure case which could cause several problems:
>> 1. Checkpoint timeout as you mentioned.
>> 2. The recovery cost is high once failover, because much data needs to be 
>> replayed.
>> 3. The delay for commit-based sink is high in exactly-once.
>> 
>> For credit-based flow control from release-1.5, the amount of in-flighting 
>> buffers before barrier alignment is reduced, so we could get a bit
>> benefits from speeding checkpoint aspect.
>> 
>> In release-1.8, I guess we did not suspend the channels which already 
>> received the barrier in practice. But actually we ever did the similar thing
>> to speed barrier alighment before. I am not quite sure that release-1.8 
>> covers this feature. There were some relevant discussions under jira [1].
>> 
>> For release-1.10, the community is now discussing the feature of unaligned 
>> checkpoint which is mainly for resolving above concerns. The basic idea
>> is to make barrier overtakes the output/input buffer queue to speed 
>> alignment, and snapshot the input/output buffers as part of checkpoint 
>> state. The
>> details have not confirmed yet and is still under discussion. Wish we could 
>> make some improvments for the release-1.10.
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-8523
>> 
>> Best,
>> Zhijiang
>> --
>> From:Thomas Weise 
>> Send Time:2019年8月12日(星期一) 21:38
>> To:dev 
>> Subject:Checkpointing under backpressure
>> 
>> Hi,
>> 
>> One of the major operational difficulties we observe with Flink are
>> checkpoint timeouts under backpressure. I'm looking for both confirmation
>> of my understanding of the current behavior as well as pointers for future
>> improvement work:
>> 
>> Prior to introduction of credit based flow control in the network stack [1]
>> [2], checkpoint barriers would back up with the data for all logical
>> channels due to TCP backpressure. Since Flink 1.5, the buffers are
>> controlled per channel, and checkpoint barriers are only held back for
>> channels that have backpressure, while others can continue processing
>> normally. However, checkpoint barriers still cannot "overtake data",
>> therefore checkpoint alignment remains affected for the channel with
>> backpressure, with the potential for slow checkpointing and timeouts.
>> Albeit the delay of barriers would be capped by the maximum in-transit
>> buffers per channel, resulting in an improvement compared to previous
>> versions of Flink. Also, the backpressure based checkpoint alignment can
>> help the barrier advance faster on the receiver side (by suspending
>> channels that have already delivered the barrier). Is that accurate as of
>> Flink 1.8?
>> 
>> What appears to be missing to completely unblock checkpointing is a
>> mechanism for checkpoints to overtake the data. That would help in
>> situations where the processing itself is the bottleneck and prioritization
>> in the network stack alone cannot address the barrier delay. Was there any
>> related discussion? One possible solution would be to drain incoming data
>> till the barrier and make it part of the checkpoint instead of processing
>> it. This is somewhat related to asynchronous processing, but I'm thinking
>> 

Re: [Proposal] Utilities for reading, transforming and creating Streaming savepoints

2018-08-18 Thread Paris Carbone
+1

Might also be a good start to implement queryable stream state with snapshot 
isolation using that mechanism.

Paris

> On 17 Aug 2018, at 12:28, Gyula Fóra  wrote:
> 
> Hi All!
> 
> I want to share with you a little project we have been working on at King
> (with some help from some dataArtisans folks). I think this would be a
> valuable addition to Flink and solve a bunch of outstanding production
> use-cases and headaches around state bootstrapping and state analytics.
> 
> We have built a quick and dirty POC implementation on top of Flink 1.6,
> please check the README for some nice examples to get a quick idea:
> 
> https://github.com/king/bravo
> 
> *Short story*
> Bravo is a convenient state reader and writer library leveraging the
> Flink’s batch processing capabilities. It supports processing and writing
> Flink streaming savepoints. At the moment it only supports processing
> RocksDB savepoints but this can be extended in the future for other state
> backends and checkpoint types.
> 
> Our goal is to cover a few basic features:
> 
>   - Converting keyed states to Flink DataSets for processing and analytics
>   - Reading/Writing non-keyed operators states
>   - Bootstrap keyed states from Flink DataSets and create new valid
>   savepoints
>   - Transform existing savepoints by replacing/changing some states
> 
> 
> Some example use-cases:
> 
>   - Point-in-time state analytics across all operators and keys
>   - Bootstrap state of a streaming job from external resources such as
>   reading from database/filesystem
>   - Validate and potentially repair corrupted state of a streaming job
>   - Change max parallelism of a job
> 
> 
> Our main goal is to start working together with other Flink production
> users and make this something useful that can be part of Flink. So if you
> have use-cases please talk to us :)
> I have also started a google doc which contains a little bit more info than
> the readme and could be a starting place for discussions:
> 
> https://docs.google.com/document/d/103k6wPX20kMu5H3SOOXSg5PZIaYpwdhqBMr-ppkFL5E/edit?usp=sharing
> 
> I know there are a bunch of rough edges and bugs (and no tests) but our
> motto is: If you are not embarrassed, you released too late :)
> 
> Please let me know what you think!
> 
> Cheers,
> Gyula



Re: Streaming Graph processing

2017-07-04 Thread Paris Carbone
I cannot answer that for sure since graph streams are still a research topic.
It depends on the demand and how fast graph stream representations and 
operations will become adopted.

If there is high demand on Flink we can definitely start a FLIP at some point 
but for now it makes sense to see how side inputs and other related features 
will evolve to make a proper integration.
In the meantime feel free to try gelly-streaming and let us know of your 
impressions so far!

cheers
Paris

On 30 Jun 2017, at 19:03, Ameet BD 
<ameetbigd...@gmail.com<mailto:ameetbigd...@gmail.com>> wrote:

Hi Paris,

Thanks for the reply. Any idea when will be Gelly-Stream become part of
official Flink distribution?
Regards,
Ameet

On Fri, Jun 30, 2017 at 8:20 PM, Paris Carbone 
<par...@kth.se<mailto:par...@kth.se>> wrote:

Hi Ameet,

Flink’s Gelly currently operates on the DataSet model.
However, we have an experimental project with Vasia (Gelly-Stream) that
does exactly that.

You can check it out and let us know directly what you think:
https://github.com/vasia/gelly-streaming

Paris

On 30 Jun 2017, at 13:17, Ameet BD <ameetbigd...@gmail.com> wrote:

Hi ,

Can anyone please point me to examples on streaming graph processing based
on Gelly.

Regards,
Ameet





Re: Streaming Graph processing

2017-06-30 Thread Paris Carbone
Hi Ameet,

Flink’s Gelly currently operates on the DataSet model.
However, we have an experimental project with Vasia (Gelly-Stream) that does 
exactly that.

You can check it out and let us know directly what you think:
https://github.com/vasia/gelly-streaming

Paris

On 30 Jun 2017, at 13:17, Ameet BD 
> wrote:

Hi ,

Can anyone please point me to examples on streaming graph processing based
on Gelly.

Regards,
Ameet



Re: Flink streaming job with iterations gets stuck waiting for network buffers

2017-04-03 Thread Paris Carbone
Hi Andrey,

If I am not mistaken this sounds like a known deadlock case and can be caused 
by the combination of Flink's backpressure mechanism with iterations (more 
likely when there is heavy feedback load).
Keep in mind that, currently, iterations are (perhaps the only) not stable 
feature to use. The good news is that there is a complete redesign planned for 
it (partly FLIP-15 [1]) that has to entirely address this pending flow control 
issue as well.

Increasing network buffers or feedback queue capacity to a really high number 
decreases the possibility of the deadlock but does not eliminate it.
I really cannot think of a quick solution to the problem that does not involve 
some deep changes.

I am CCing dev since this seems like a very relevant use case to revive the 
discussion for the loops redesign and also keep you in the loop (no pun 
intended) regarding this specific issue.
Will also update FLIP-15 with several interesting proposals under discussion 
from Stephan to tackle this issue.

cheers,
Paris

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination


On 3 Apr 2017, at 12:54, Andrey Melentyev 
> wrote:

Hi,

I have a Flink 1.2.0 streaming job using a number of stateful operators and an 
iteration loop with a RichFlatMapFunction inside. On the high level, the app 
reads some data, massages it and feeds into an iterative algorithm which 
produces some output and feedback while keeping the state. All stateful 
operators are on KeyedStreams. Input is some data on file system and output is 
stdout.

The implementation passes functional tests but when tested with noticeable 
amounts of input data (tens of thousands records, dozens of MB raw data) after 
a few seconds of good throughput, backpressure kicks in and the application 
essentially gets stuck: most of the threads are blocked waiting for buffers, 
occasional message gets processed every few minutes. There's nothing strange in 
the log files.

The behaviour is reproducible both in local execution environment and in Flink 
standalone cluster (started using jobmanager.sh and taskmanager.sh)

The problematic part is likely in the iterations since the part of the job 
before iterations works fine with the same data.

I would appreciate pointers as to how to debug this. 
taskmanager.network.numberOfBuffers from the config sounds relevant but the 
default value of 2048 is already much higher than slots-per-TM^2 * #TMs * 4 = 
4^2 * 1 * 4 = 64.

Attaching flink config, job execution plan and thread dump with some sensitive 
parts retracted.

flink-conf.yml

jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512
taskmanager.heap.mb: 8192
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 4
jobmanager.web.port: 8081
state.backend: rocksdb
state.backend.fs.checkpointdir: 
file:///Users/andrey.melentyev/tmp/flink-checkpoints

Job execution plan

{
  "nodes": [
{
  "contents": "IterationSource-10",
  "id": -1,
  "pact": "Data Source",
  "parallelism": 8,
  "type": "IterationSource-10"
},
{
  "contents": "Source: Custom File Source",
  "id": 1,
  "pact": "Data Source",
  "parallelism": 1,
  "type": "Source: Custom File Source"
},
{
  "contents": "Split Reader: Custom File Source",
  "id": 2,
  "pact": "Operator",
  "parallelism": 8,
  "predecessors": [
{
  "id": 1,
  "ship_strategy": "REBALANCE",
  "side": "second"
}
  ],
  "type": "Split Reader: Custom File Source"
},
{
  "contents": "Parse JSON",
  "id": 3,
  "pact": "Operator",
  "parallelism": 8,
  "predecessors": [
{
  "id": 2,
  "ship_strategy": "FORWARD",
  "side": "second"
}
  ],
  "type": "Parse JSON"
},
{
  "contents": "Split records",
  "id": 4,
  "pact": "Operator",
  "parallelism": 8,
  "predecessors": [
{
  "id": 3,
  "ship_strategy": "FORWARD",
  "side": "second"
}
  ],
  "type": "Split records (Stateless)"
},
{
  "contents": "Produce Some Data",
  "id": 6,
  "pact": "Operator",
  "parallelism": 8,
  "predecessors": [
{
  "id": 3,
  "ship_strategy": "FORWARD",
  "side": "second"
}
  ],
  "type": "Produce Some Data (Stateless)"
},
{
  "contents": "Produce Some More Data (Stateful)",
  "id": 7,
  "pact": "Operator",
  "parallelism": 8,
  "predecessors": [
{
  "id": 4,
  "ship_strategy": "HASH",
  "side": "second"
}
  ],
  "type": "Produce Some More Data (Stateful)"
},
{
  "contents": "Map",
  "id": 9,
  "pact": "Operator",
  "parallelism": 8,
  "predecessors": [

Re: [DISCUSS] Per-key event time

2017-02-22 Thread Paris Carbone
Hey Jamie!

Key-based progress tracking sounds like local-only progress tracking to me, 
there is no need to use a low watermarking mechanism at all since all streams 
of a key are handled by a single partition at a time (per operator).
Thus, this could be much easier to implement and support (i.e., no need to 
broadcast the progress state of each partition all the time). 
State-wise it should be fine too if it is backed by rocksdb, especially if we 
have MapState in the future.

Just my quick thoughts on this, to get the discussion going :)

cheers
Paris

> On 23 Feb 2017, at 01:01, Jamie Grier  wrote:
> 
> Hi Flink Devs,
> 
> Use cases that I see quite frequently in the real world would benefit from
> a different watermarking / event time model than the one currently
> implemented in Flink.
> 
> I would call Flink's current approach partition-based watermarking or maybe
> subtask-based watermarking.  In this model the current "event time" is a
> property local to each subtask instance in a dataflow graph.  The event
> time at any subtask is the minimum of the watermarks it has received on
> each of it's input streams.
> 
> There are a couple of issues with this model that are not optimal for some
> (maybe many) use cases.
> 
> 1) A single slow subtask (or say source partition) anywhere in the dataflow
> can mean no progress can be made on the computation at all.
> 
> 2) In many real world scenarios the time skew across keys can be *many*
> times greater than the time skew within the data with the same key.
> 
> In this discussion I'll use "time skew" to refer to the out-of-orderness
> with respect to timestamp of the data.  Out-of-orderness is a mouthful ;)
> 
> Anyway, let me provide an example or two.
> 
> In IoT applications the source of events is a particular device out in the
> world, let's say a device in a connected car application.  The data for
> some particular device may be very bursty and we will certainly get events
> from these devices in Flink out-of-order just because of things like
> partitions in Kafka, shuffles in Flink, etc.  However, the time skew in the
> data for a single device should likely be very small (milliseconds or maybe
> seconds)..
> 
> However, in the same application the time skew across different devices can
> be huge (hours or even days).  An obvious example of this, again using
> connected cars as a representative example is the following:  Car A is
> recording data locally at 12:00 pm on Saturday but doesn't currently have a
> network connection.  Car B is doing the same thing but does have a network
> connection.  Car A will transmit it's data when the network comes back on
> line.  Let's say this is at 4pm.  Car B was transmitting it's data
> immediately.  This creates a huge time skew (4 hours) in the observed
> datastream when looked at as a whole.  However, the time skew in that data
> for Car A or Car B alone could be tiny.  It will be out of order of course
> but maybe by only milliseconds or seconds.
> 
> What the above means in the end for Flink is that the watermarks must be
> delayed by up to 4 hours or more because we're looking at the data stream
> as a whole -- otherwise the data for Car A will be considered late.  The
> time skew in the data stream when looked at as a whole is large even though
> the time skew for any key may be tiny.
> 
> This is the problem I would like to see a solution for.  The basic idea of
> keeping track of watermarks and event time "per-key" rather than per
> partition or subtask would solve I think both of these problems stated
> above and both of these are real issues for production applications.
> 
> The obvious downside of trying to do this per-key is that the amount of
> state you have to track is much larger and potentially unbounded.  However,
> I could see this approach working if the keyspace isn't growing rapidly but
> is stable or grows slowly.  The saving grace here is that this may actually
> be true of the types of applications where this would be especially
> useful.  Think IoT use cases.  Another approach to keeping state size in
> check would be a configurable TTL for a key.
> 
> Anyway, I'm throwing this out here on the mailing list in case anyone is
> interested in this discussion, has thought about the problem deeply
> already, has use cases of their own they've run into or has ideas for a
> solution to this problem.
> 
> Thanks for reading..
> 
> -Jamie
> 
> 
> -- 
> 
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier 
> ja...@data-artisans.com



Re: [ANNOUNCE] Welcome Stefan Richter as a new committer

2017-02-10 Thread Paris Carbone
Congratz Stefan! Keep up with the neat contributions

> On 10 Feb 2017, at 17:10, Haohui Mai  wrote:
> 
> Congrats!
> On Fri, Feb 10, 2017 at 8:08 AM Henry Saputra 
> wrote:
> 
>> Congrats and welcome!
>> 
>> On Fri, Feb 10, 2017 at 7:45 AM, Tzu-Li (Gordon) Tai 
>> wrote:
>> 
>>> Great news! Welcome Stefan :-D
>>> 
>>> 
>>> On February 10, 2017 at 11:36:14 PM, Aljoscha Krettek (
>> aljos...@apache.org)
>>> wrote:
>>> 
>>> Welcome! :-)
>>> 
>>> On Fri, 10 Feb 2017 at 16:10 Till Rohrmann  wrote:
>>> 
 Great to have you on board as a committer Stefan :-)
 
 On Fri, Feb 10, 2017 at 3:32 PM, Greg Hogan 
>> wrote:
 
> Welcome, Stefan, and thank you for your contributions!
> 
> On Fri, Feb 10, 2017 at 5:00 AM, Ufuk Celebi  wrote:
> 
>> Hey everyone,
>> 
>> I'm very happy to announce that the Flink PMC has accepted Stefan
>> Richter to become a committer of the Apache Flink project.
>> 
>> Stefan is part of the community for almost a year now and worked on
>> major features of the latest 1.2 release, most notably rescaling
>> and
>> backwards compatibility of program state.
>> 
>> Please join me in welcoming Stefan. :-)
>> 
>> – Ufuk
>> 
> 
 
>>> 
>> 



Re: [ANNOUNCE] Welcome Jark Wu and Kostas Kloudas as committers

2017-02-08 Thread Paris Carbone
welcome aboard Kostas and Jark :)

Paris

> On 7 Feb 2017, at 21:16, Fabian Hueske  wrote:
> 
> Hi everybody,
> 
> I'm very happy to announce that Jark Wu and Kostas Kloudas accepted the
> invitation of the Flink PMC to become committers of the Apache Flink
> project.
> 
> Jark and Kostas are longtime members of the Flink community.
> Both are actively driving Flink's development and contributing to its
> community in many ways.
> 
> Please join me in welcoming Kostas and Jark as committers.
> 
> Fabian



Re: Unregistering Managed State in Operator Backend

2017-01-24 Thread Paris Carbone
Indeed, I noticed that now. Then it should be fairly simple, if you find it 
reasonable too.

> On 24 Jan 2017, at 14:20, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Just a bit of clarification, the OperatorState stuff is independent of
> keyed state backends, i.e. even if you use RocksDB the operator state will
> not be stored in RocksDB, only keyed state is stored there.
> 
> Right now, when an operator state (ListState) is empty we will still write
> some meta data about that state. I think it should be easy to
> change DefaultOperatorStateBackend to not write anything in case of an
> empty state. What do you think, Stefan?
> 
> On Tue, 24 Jan 2017 at 12:12 Paris Carbone <par...@kth.se> wrote:
> 
>> Sure Till,
>> 
>> I would love to also make the patch but need to prioritize some other
>> things these days.
>> At least I will dig and see how complex this is regarding the different
>> backends.
>> 
>> I also have some follow-up questions, in case anybody has thought about
>> these things already (or is simply interested):
>> 
>> - Do you think it would make sense to automatically garbage collect empty
>> states in general?
>> - Shouldn't this happen already during snapshot compaction (in rocksdb)
>> and would that violate any user assumptions in your view?
>> 
>> 
>>> On 24 Jan 2017, at 11:44, Till Rohrmann <trohrm...@apache.org> wrote:
>>> 
>>> Hi Paris,
>>> 
>>> if there is no such issue open, then please open one so that we can track
>>> the issue. If you have time to work on that even better :-)
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Tue, Jan 24, 2017 at 10:25 AM, Paris Carbone <par...@kth.se> wrote:
>>> 
>>>> Any thoughts/plans?
>>>> So should I open a Jira and add this?
>>>> 
>>>> Paris
>>>> 
>>>> On Jan 21, 2017, at 5:17 PM, Paris Carbone <par...@kth.se> parisc@
>>>> kth.se>> wrote:
>>>> 
>>>> Thank you for the answer Ufuk!
>>>> 
>>>> To elaborate a bit more, I am not using keyed state, it would be indeed
>>>> tricky in that case to discard everything.
>>>> 
>>>> I need that for operator state, in my loop fault tolerance PR [1].  The
>>>> idea is to tag a ListState (upstream log) per snapshot id.
>>>> When a concurent snapshot is commited I want to simply remove everything
>>>> related to that ListState (not just clear it). This would also
>> eliminate a
>>>> memory leak in case many empty logs accumulate in time (and thus state
>>>> entries).
>>>> Hope that makes it a bit more clear. Thanks again :)
>>>> 
>>>> Paris
>>>> 
>>>> [1] https://github.com/apache/flink/pull/1668
>>>> 
>>>> 
>>>> On 21 Jan 2017, at 17:10, Ufuk Celebi <u...@apache.org<mailto:uce@
>>>> apache.org>> wrote:
>>>> 
>>>> Hey Paris!
>>>> 
>>>> As far as I know it's not possible at the moment and not planned. Does
>>>> not sound to hard to add though. @Stefan: correct?
>>>> 
>>>> You can currently only clear the state via #clear in the scope of the
>>>> key for keyed state or the whole operator when used with operator
>>>> state. In case of keyed state it's indeed hard to clear all state for
>>>> operator state it's slightly better. I'm curious what your use case
>>>> is?
>>>> 
>>>> - Ufuk
>>>> 
>>>> 
>>>> On Fri, Jan 20, 2017 at 5:59 PM, Paris Carbone <par...@kth.se>>> par...@kth.se>> wrote:
>>>> Hi folks,
>>>> 
>>>> I have a little question regarding the managed store operator backend,
>> in
>>>> case someone can help.
>>>> 
>>>> Is there some convenient way (planned or under development) to
>> completely
>>>> unregister a state entry (e.g. a ListState) with a given id from the
>>>> backend?
>>>> It is fairly easy to register new states dynamically (i.e. with
>>>> getOperatorState(...)), why not being able to discard it as well?
>>>> 
>>>> I would find this feature extremely convenient to a fault tolerance
>>>> related PR I am working on but I can think of many use cases that might
>>>> need it.
>>>> 
>>>> 
>>>> Paris
>>>> 
>>>> 
>>>> 
>> 
>> 



Re: Unregistering Managed State in Operator Backend

2017-01-24 Thread Paris Carbone
Sure Till,

I would love to also make the patch but need to prioritize some other things 
these days. 
At least I will dig and see how complex this is regarding the different 
backends.
 
I also have some follow-up questions, in case anybody has thought about these 
things already (or is simply interested):
 
- Do you think it would make sense to automatically garbage collect empty 
states in general? 
- Shouldn't this happen already during snapshot compaction (in rocksdb) and 
would that violate any user assumptions in your view?


> On 24 Jan 2017, at 11:44, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> Hi Paris,
> 
> if there is no such issue open, then please open one so that we can track
> the issue. If you have time to work on that even better :-)
> 
> Cheers,
> Till
> 
> On Tue, Jan 24, 2017 at 10:25 AM, Paris Carbone <par...@kth.se> wrote:
> 
>> Any thoughts/plans?
>> So should I open a Jira and add this?
>> 
>> Paris
>> 
>> On Jan 21, 2017, at 5:17 PM, Paris Carbone <par...@kth.se<mailto:parisc@
>> kth.se>> wrote:
>> 
>> Thank you for the answer Ufuk!
>> 
>> To elaborate a bit more, I am not using keyed state, it would be indeed
>> tricky in that case to discard everything.
>> 
>> I need that for operator state, in my loop fault tolerance PR [1].  The
>> idea is to tag a ListState (upstream log) per snapshot id.
>> When a concurent snapshot is commited I want to simply remove everything
>> related to that ListState (not just clear it). This would also eliminate a
>> memory leak in case many empty logs accumulate in time (and thus state
>> entries).
>> Hope that makes it a bit more clear. Thanks again :)
>> 
>> Paris
>> 
>> [1] https://github.com/apache/flink/pull/1668
>> 
>> 
>> On 21 Jan 2017, at 17:10, Ufuk Celebi <u...@apache.org<mailto:uce@
>> apache.org>> wrote:
>> 
>> Hey Paris!
>> 
>> As far as I know it's not possible at the moment and not planned. Does
>> not sound to hard to add though. @Stefan: correct?
>> 
>> You can currently only clear the state via #clear in the scope of the
>> key for keyed state or the whole operator when used with operator
>> state. In case of keyed state it's indeed hard to clear all state for
>> operator state it's slightly better. I'm curious what your use case
>> is?
>> 
>> - Ufuk
>> 
>> 
>> On Fri, Jan 20, 2017 at 5:59 PM, Paris Carbone <par...@kth.se> par...@kth.se>> wrote:
>> Hi folks,
>> 
>> I have a little question regarding the managed store operator backend, in
>> case someone can help.
>> 
>> Is there some convenient way (planned or under development) to completely
>> unregister a state entry (e.g. a ListState) with a given id from the
>> backend?
>> It is fairly easy to register new states dynamically (i.e. with
>> getOperatorState(...)), why not being able to discard it as well?
>> 
>> I would find this feature extremely convenient to a fault tolerance
>> related PR I am working on but I can think of many use cases that might
>> need it.
>> 
>> 
>> Paris
>> 
>> 
>> 



Re: Unregistering Managed State in Operator Backend

2017-01-24 Thread Paris Carbone
Sure Till,

I would love to also make the patch but need to prioritize some other
things these days.
At least I will dig and see how complex this is regarding the different
backends.

I also have some follow-up questions, in case anybody has thought about
these things already (or is simply interested):

- Do you think it would make sense to automatically garbage collect empty
states in general?
- Shouldn't this happen already during snapshot compaction (in rocksdb) and
would that violate any user assumptions in your view?

On Tue, Jan 24, 2017 at 11:44 AM, Till Rohrmann <trohrm...@apache.org>
wrote:

> Hi Paris,
>
> if there is no such issue open, then please open one so that we can track
> the issue. If you have time to work on that even better :-)
>
> Cheers,
> Till
>
> On Tue, Jan 24, 2017 at 10:25 AM, Paris Carbone <par...@kth.se> wrote:
>
> > Any thoughts/plans?
> > So should I open a Jira and add this?
> >
> > Paris
> >
> > On Jan 21, 2017, at 5:17 PM, Paris Carbone <par...@kth.se<mailto:parisc@
> > kth.se>> wrote:
> >
> > Thank you for the answer Ufuk!
> >
> > To elaborate a bit more, I am not using keyed state, it would be indeed
> > tricky in that case to discard everything.
> >
> > I need that for operator state, in my loop fault tolerance PR [1].  The
> > idea is to tag a ListState (upstream log) per snapshot id.
> > When a concurent snapshot is commited I want to simply remove everything
> > related to that ListState (not just clear it). This would also eliminate
> a
> > memory leak in case many empty logs accumulate in time (and thus state
> > entries).
> > Hope that makes it a bit more clear. Thanks again :)
> >
> > Paris
> >
> > [1] https://github.com/apache/flink/pull/1668
> >
> >
> > On 21 Jan 2017, at 17:10, Ufuk Celebi <u...@apache.org<mailto:uce@
> > apache.org>> wrote:
> >
> > Hey Paris!
> >
> > As far as I know it's not possible at the moment and not planned. Does
> > not sound to hard to add though. @Stefan: correct?
> >
> > You can currently only clear the state via #clear in the scope of the
> > key for keyed state or the whole operator when used with operator
> > state. In case of keyed state it's indeed hard to clear all state for
> > operator state it's slightly better. I'm curious what your use case
> > is?
> >
> > - Ufuk
> >
> >
> > On Fri, Jan 20, 2017 at 5:59 PM, Paris Carbone <par...@kth.se > par...@kth.se>> wrote:
> > Hi folks,
> >
> > I have a little question regarding the managed store operator backend, in
> > case someone can help.
> >
> > Is there some convenient way (planned or under development) to completely
> > unregister a state entry (e.g. a ListState) with a given id from the
> > backend?
> > It is fairly easy to register new states dynamically (i.e. with
> > getOperatorState(...)), why not being able to discard it as well?
> >
> > I would find this feature extremely convenient to a fault tolerance
> > related PR I am working on but I can think of many use cases that might
> > need it.
> >
> >
> > Paris
> >
> >
> >
>


Re: Unregistering Managed State in Operator Backend

2017-01-24 Thread Paris Carbone
Any thoughts/plans?
So should I open a Jira and add this?

Paris

On Jan 21, 2017, at 5:17 PM, Paris Carbone 
<par...@kth.se<mailto:par...@kth.se>> wrote:

Thank you for the answer Ufuk!

To elaborate a bit more, I am not using keyed state, it would be indeed tricky 
in that case to discard everything.

I need that for operator state, in my loop fault tolerance PR [1].  The idea is 
to tag a ListState (upstream log) per snapshot id.
When a concurent snapshot is commited I want to simply remove everything 
related to that ListState (not just clear it). This would also eliminate a 
memory leak in case many empty logs accumulate in time (and thus state entries).
Hope that makes it a bit more clear. Thanks again :)

Paris

[1] https://github.com/apache/flink/pull/1668


On 21 Jan 2017, at 17:10, Ufuk Celebi <u...@apache.org<mailto:u...@apache.org>> 
wrote:

Hey Paris!

As far as I know it's not possible at the moment and not planned. Does
not sound to hard to add though. @Stefan: correct?

You can currently only clear the state via #clear in the scope of the
key for keyed state or the whole operator when used with operator
state. In case of keyed state it's indeed hard to clear all state for
operator state it's slightly better. I'm curious what your use case
is?

- Ufuk


On Fri, Jan 20, 2017 at 5:59 PM, Paris Carbone 
<par...@kth.se<mailto:par...@kth.se>> wrote:
Hi folks,

I have a little question regarding the managed store operator backend, in case 
someone can help.

Is there some convenient way (planned or under development) to completely 
unregister a state entry (e.g. a ListState) with a given id from the backend?
It is fairly easy to register new states dynamically (i.e. with 
getOperatorState(...)), why not being able to discard it as well?

I would find this feature extremely convenient to a fault tolerance related PR 
I am working on but I can think of many use cases that might need it.


Paris




Re: Unregistering Managed State in Operator Backend

2017-01-21 Thread Paris Carbone
Thank you for the answer Ufuk!

To elaborate a bit more, I am not using keyed state, it would be indeed tricky 
in that case to discard everything.

I need that for operator state, in my loop fault tolerance PR [1].  The idea is 
to tag a ListState (upstream log) per snapshot id.
When a concurent snapshot is commited I want to simply remove everything 
related to that ListState (not just clear it). This would also eliminate a 
memory leak in case many empty logs accumulate in time (and thus state entries).
Hope that makes it a bit more clear. Thanks again :)

Paris

[1] https://github.com/apache/flink/pull/1668


On 21 Jan 2017, at 17:10, Ufuk Celebi <u...@apache.org<mailto:u...@apache.org>> 
wrote:

Hey Paris!

As far as I know it's not possible at the moment and not planned. Does
not sound to hard to add though. @Stefan: correct?

You can currently only clear the state via #clear in the scope of the
key for keyed state or the whole operator when used with operator
state. In case of keyed state it's indeed hard to clear all state for
operator state it's slightly better. I'm curious what your use case
is?

– Ufuk


On Fri, Jan 20, 2017 at 5:59 PM, Paris Carbone 
<par...@kth.se<mailto:par...@kth.se>> wrote:
Hi folks,

I have a little question regarding the managed store operator backend, in case 
someone can help.

Is there some convenient way (planned or under development) to completely 
unregister a state entry (e.g. a ListState) with a given id from the backend?
It is fairly easy to register new states dynamically (i.e. with 
getOperatorState(…)), why not being able to discard it as well?

I would find this feature extremely convenient to a fault tolerance related PR 
I am working on but I can think of many use cases that might need it.


Paris




Re: Taking time off

2017-01-21 Thread Paris Carbone
Thanks for all the cool contributions on Flink and Beam!
Keep rockin' in the distr systems space :)

Paris

> On 21 Jan 2017, at 00:53, Fabian Hueske  wrote:
> 
> Hi Max,
> 
> Thanks for all your efforts!
> Hope to see you back soon.
> 
> Take care, Fabian
> 
> 
> 2017-01-16 11:22 GMT+01:00 Vasiliki Kalavri :
> 
>> Hi Max,
>> 
>> thank you for all your work! Enjoy your time off and hope to have you back
>> with us soon ^^
>> 
>> Cheers,
>> -Vasia.
>> 
>> On 14 January 2017 at 09:03, Maximilian Michels  wrote:
>> 
>>> Dear Squirrels,
>>> 
>>> Thank you! It's been very exciting to see the Flink community grow and
>>> flourish over the past two years.
>>> 
>>> For the beginning of this year, I decided to take some time off, which
>>> means I'll be less engaged on the mailing list or on GitHub/JIRA.
>>> 
>>> In the meantime, if you have any questions I might be able to answer,
>> feel
>>> free to contact me. Looking forward to see the squirrels rise further!
>>> 
>>> Best,
>>> Max
>>> 
>> 



Unregistering Managed State in Operator Backend

2017-01-20 Thread Paris Carbone
Hi folks,

I have a little question regarding the managed store operator backend, in case 
someone can help. 

Is there some convenient way (planned or under development) to completely 
unregister a state entry (e.g. a ListState) with a given id from the backend?
It is fairly easy to register new states dynamically (i.e. with 
getOperatorState(…)), why not being able to discard it as well?

I would find this feature extremely convenient to a fault tolerance related PR 
I am working on but I can think of many use cases that might need it.


Paris



Re: [DISCUSS] Time-based releases in Flink

2017-01-18 Thread Paris Carbone
�&�-�ǖy�Z�_5�^��m5�2�1(���n��

Re: [DISCUSS] (Not) tagging reviewers

2017-01-16 Thread Paris Carbone
I also agree with all the points, especially when it comes to new PRs.

Though, when someone has started reviewing a PR and shows interest it probably 
makes sense to finish doing so. Wouldn’t tagging be acceptable there?
In those case tagging triggers direct notifications, so that people already 
involved in a conversation get reminded and answer pending questions. 

> On 16 Jan 2017, at 12:45, Fabian Hueske  wrote:
> 
> Thanks for bringing this up Stephan.
> I completely agree with you.
> 
> Cheers, Fabian
> 
> 2017-01-16 12:42 GMT+01:00 Stephan Ewen :
> 
>> Hi!
>> 
>> I have seen that recently many pull requests designate reviews by writing
>> "@personA review please" or so.
>> 
>> I am personally quite strongly against that, I think it hurts the community
>> work:
>> 
>>  - The same few people get usually "designated" and will typically get
>> overloaded and often not do the review.
>> 
>>  - At the same time, this discourages other community members from looking
>> at the pull request, which is totally undesirable.
>> 
>>  - In general, review participation should be "pull based" (person decides
>> what they want to work on) not "push based" (random person pushes work to
>> another person). Push-based just creates the wrong feeling in a community
>> of volunteers.
>> 
>>  - In many cases the designated reviews are not the ones most
>> knowledgeable in the code, which is understandable, because how should
>> contributors know whom to tag?
>> 
>> 
>> Long story short, why don't we just drop that habit?
>> 
>> 
>> Greetings,
>> Stephan
>> 



Re: [ANNOUNCE] Stability/Scalability effort and Pull Requests

2016-12-02 Thread Paris Carbone
This is really huge. Highly appreciated Stephan and the rest :)

The unfair input handling you mentioned was maybe responsible of the incredibly 
slow processing of records through not-so-active feedback edges (e.g. in ML 
projects).

cheers
Paris 

> On 2 Dec 2016, at 14:48, Stephan Ewen  wrote:
> 
> Hi fellow Flink enthusiasts!
> 
> Over the past weeks, several community members have been working hard on a
> thread of issues to make Flink more stable and scalable for some demanding
> large scale deployments.
> 
> It was quite a lot of work digging through the masses of logs, setting up
> clusters to reproduce issues, debugging, and so on. Because of that
> intensive time involvement, some of us spent considerably less time on Pull
> Requests - as a result we have quite a backlog of pull requests.
> 
> We are nearing the end of this effort, and I believe the pull requests can
> expect to get more attention again in the near future.
> 
> 
> The *good news* is that this effort really pushed Flink a lot further. We
> addressed a lot of issues that now make Flink run a lot smoother in various
> setups.
> 
> Here is a sample of issues we addressed as part of that work:
> 
>  - The Network Stack had a quite serious issue with (un)fairness of stream
> transport under backpressure.
> 
>  - Checkpoints are more robust now - they decline better when they cannot
> complete and can have a limit for how much data may be buffered/spilled
> during alignment
> 
>  - Cleanup of old checkpoint state happens more reliably and scalable
> 
>  - Robustness of HA recovery is increased, in the presence of ZooKeeper
> problems and inconsistent state in ZooKeeper
> 
>  - Improving the cancellation behavior of checkpoints and connectors.
> Adding a safeguard against tasks and libraries that block the TaskManagers
> during cancellation.
> 
>  - We were diagnosing a RocksDB bug and updating to fixed version
> 
>  - Deployment scalability: Getting rid of some RPC messages, handling a
> large RPC volume on deployment better
> 
>  - Memory leaks in the JobManager in the presence of many restarts
> 
>  - Some bugs in the life cycle of tasks and the execution graph to prevent
> issues with reliable fault tolerance.
> 
> 
> I think we'll all enjoy the results of this effort, especially those that
> have to deploy and maintain large setups.
> 
> 
> Greetings,
> Stephan



Re: [DISCUSS] deprecated function need more detail

2016-11-23 Thread Paris Carbone
+1

This should always be the norm, especially for user-facing code.

While we are at it, perhaps when someone deprecates functionality the new 
alternative should also be replaced right away.
E.g. Checkpointed is deprecated but all state management tests are actually 
using this alternative.

cheers
Paris


> On 23 Nov 2016, at 11:21, Kostas Kloudas  wrote:
> 
> +1 and we should apply the same to all deprecated interfaces/abstract classes.
> 
>> On Nov 23, 2016, at 11:13 AM, Aljoscha Krettek  wrote:
>> 
>> +1 That sounds excellent.
>> 
>> On Wed, 23 Nov 2016 at 11:04 Till Rohrmann  wrote:
>> 
>>> +1 for your proposal.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Wed, Nov 23, 2016 at 9:33 AM, Fabian Hueske  wrote:
>>> 
 I agree on this one.
 Whenever we deprecate a method or a feature we should add a comment that
 explains the new API or why the feature was removed without replacement.
 
 Enforcing this information through checkstyle makes sense as well, IMO.
 
 Cheers, Fabian
 
 2016-11-23 4:42 GMT+01:00 sjk :
 
> Hi, all
> 
> Let’s have look at Checkpointed interface below. It declared deprecated
> but have no detail for why, when and how replace this function. It’s a
 big
> trouble for the users.
> 
> @Deprecated
> @PublicEvolving
> public interface Checkpointed extends
> CheckpointedRestoring {
> 
> 
> I think we should have more detail: when give up, who replace it, why
> deprecated.
> 
> For Java code, add detail  deprecated reason in code annotations.
> For Scala code, replace Java annotation  @Deprecated(,,) with Scala
> annotation @deprecated, such as
> @deprecated(message = "the reason", since = "when fully give up”)
> 
> Add this rule to customized checkstyle plugin of maven and SBT.
> 
> Best regard
> -Jinkui Shi
 
>>> 
> 



[jira] [Created] (FLINK-5089) Introduce Loop functions and Enforce Nesting on Data Streams

2016-11-17 Thread Paris Carbone (JIRA)
Paris Carbone created FLINK-5089:


 Summary: Introduce Loop functions and Enforce Nesting on Data 
Streams
 Key: FLINK-5089
 URL: https://issues.apache.org/jira/browse/FLINK-5089
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Paris Carbone
Assignee: Paris Carbone


The current programming model of stream iterations in the DataStream API is 
exposed through low-level dataflow operations for placing arbitrary 
feedback-edges. This strategy does not enforce proper nesting and requires 
user-defined timeouts to enforce termination. It further impacts correctness, 
resulting to incomplete processing or/and inconsistent state when iterations 
exist in the topology. There is a need for a new functional, yet compositional 
API (i.e. nested loops) for expressing asynchronous DataStream loops to be able 
to build easily decentralized progress establishment algorithms such as 
distributed termination and parallel structured or fixpoint iterations.

This Jira explains parts of the needs addressed by FLIP-15 on Loops and 
Terminations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] FLIP-14: Loops API and Termination

2016-11-17 Thread Paris Carbone
That was fast!  Seems to be working. 
Thank you Fabian! 

> On 17 Nov 2016, at 13:58, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Paris,
> 
> just gave you the permissions (I hope).
> Let me know if something does not work.
> 
> Cheers, Fabian
> 
> 2016-11-17 13:48 GMT+01:00 Paris Carbone <par...@kth.se>:
> 
>> We do not have to schedule this for an early Flink release, just saying.
>> I would just like to get the changes out and you people can review it and
>> integrate it anytime at your own pace.
>> 
>> Who is the admin of the wiki? It would be nice to get write access.
>> 
>>> On 17 Nov 2016, at 13:45, Paris Carbone <par...@kth.se> wrote:
>>> 
>>> Sounds like a plan!
>>> 
>>> Can someone grant me access to write in the wiki please?
>>> My username is “senorcarbone”.
>>> 
>>> Paris
>>> 
>>>> On 16 Nov 2016, at 14:30, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>> 
>>>> I am not completely sure whether we should deprecate the old API for
>> 1.2 or
>>>> remove it completely. Personally I am in favor of removing it, I don't
>>>> think it is a huge burden to move to the new one if it makes for a much
>>>> nicer user experience.
>>>> 
>>>> I think you can go ahead add the FLIP to the wiki and open the PR so we
>> can
>>>> start the review if you have it ready anyways.
>>>> 
>>>> Gyula
>>>> 
>>>> Paris Carbone <par...@kth.se> ezt írta (időpont: 2016. nov. 16., Sze,
>>>> 11:55):
>>>> 
>>>>> Thanks for reviewing, Gyula.
>>>>> 
>>>>> One thing that is still up to discussion is whether we should remove
>>>>> completely the old iterations API or simply mark it as deprecated till
>> v2.0.
>>>>> Also, not sure what is the best process now. We have the changes ready.
>>>>> Should I copy the FLIP to the wiki and trigger the PRs or wait for a
>> few
>>>>> more days in case someone has objections?
>>>>> 
>>>>> @Stephan, what is your take on our interpretation of the approach you
>>>>> suggested? Should we proceed or is there anything that you do not find
>> nice?
>>>>> 
>>>>> Paris
>>>>> 
>>>>>> On 15 Nov 2016, at 10:01, Gyula Fóra <gyf...@apache.org> wrote:
>>>>>> 
>>>>>> Hi Paris,
>>>>>> 
>>>>>> I like the proposed changes to the iteration API, this cleans up
>> things
>>>>> in
>>>>>> the Java API without any strict restriction I think (it was never a
>>>>> problem
>>>>>> in the Scala API).
>>>>>> 
>>>>>> The termination algorithm based on the proposed scoped loops seems to
>> be
>>>>>> fairly simple and looks good :)
>>>>>> 
>>>>>> Cheers,
>>>>>> Gyula
>>>>>> 
>>>>>> Paris Carbone <par...@kth.se> ezt írta (időpont: 2016. nov. 14., H,
>>>>> 8:50):
>>>>>> 
>>>>>>> That would be great Shi! Let's take that offline.
>>>>>>> 
>>>>>>> Anyone else interested in the iteration changes? It would be nice to
>>>>>>> incorporate these to v1.2 if possible so I count on your review asap.
>>>>>>> 
>>>>>>> cheers,
>>>>>>> Paris
>>>>>>> 
>>>>>>> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <
>> xiaogang@alibaba-inc.com
>>>>>>> <mailto:xiaogang@alibaba-inc.com>> wrote:
>>>>>>> 
>>>>>>> Hi Paris
>>>>>>> 
>>>>>>> Unfortunately, the project is not public yet.
>>>>>>> But i can provide you a primitive implementation of the update
>> protocol
>>>>> in
>>>>>>> the paper. It’s implemented in Storm. Since the protocol assumes the
>>>>>>> communication channels between different tasks are dual, i think it’s
>>>>> not
>>>>>>> easy to adapt it to Flink.
>>>>>>> 
>>>>>>> Regards
>>>>>>> Xiaogang
>>>>>>> 
>>>>>>> 
>>>>>>> 在 2016年11月12日,上午3:03,Paris Carbon

Re: [DISCUSS] FLIP-14: Loops API and Termination

2016-11-17 Thread Paris Carbone
We do not have to schedule this for an early Flink release, just saying.
I would just like to get the changes out and you people can review it and 
integrate it anytime at your own pace.

Who is the admin of the wiki? It would be nice to get write access.

> On 17 Nov 2016, at 13:45, Paris Carbone <par...@kth.se> wrote:
> 
> Sounds like a plan!
> 
> Can someone grant me access to write in the wiki please?
> My username is “senorcarbone”.
> 
> Paris
> 
>> On 16 Nov 2016, at 14:30, Gyula Fóra <gyula.f...@gmail.com> wrote:
>> 
>> I am not completely sure whether we should deprecate the old API for 1.2 or
>> remove it completely. Personally I am in favor of removing it, I don't
>> think it is a huge burden to move to the new one if it makes for a much
>> nicer user experience.
>> 
>> I think you can go ahead add the FLIP to the wiki and open the PR so we can
>> start the review if you have it ready anyways.
>> 
>> Gyula
>> 
>> Paris Carbone <par...@kth.se> ezt írta (időpont: 2016. nov. 16., Sze,
>> 11:55):
>> 
>>> Thanks for reviewing, Gyula.
>>> 
>>> One thing that is still up to discussion is whether we should remove
>>> completely the old iterations API or simply mark it as deprecated till v2.0.
>>> Also, not sure what is the best process now. We have the changes ready.
>>> Should I copy the FLIP to the wiki and trigger the PRs or wait for a few
>>> more days in case someone has objections?
>>> 
>>> @Stephan, what is your take on our interpretation of the approach you
>>> suggested? Should we proceed or is there anything that you do not find nice?
>>> 
>>> Paris
>>> 
>>>> On 15 Nov 2016, at 10:01, Gyula Fóra <gyf...@apache.org> wrote:
>>>> 
>>>> Hi Paris,
>>>> 
>>>> I like the proposed changes to the iteration API, this cleans up things
>>> in
>>>> the Java API without any strict restriction I think (it was never a
>>> problem
>>>> in the Scala API).
>>>> 
>>>> The termination algorithm based on the proposed scoped loops seems to be
>>>> fairly simple and looks good :)
>>>> 
>>>> Cheers,
>>>> Gyula
>>>> 
>>>> Paris Carbone <par...@kth.se> ezt írta (időpont: 2016. nov. 14., H,
>>> 8:50):
>>>> 
>>>>> That would be great Shi! Let's take that offline.
>>>>> 
>>>>> Anyone else interested in the iteration changes? It would be nice to
>>>>> incorporate these to v1.2 if possible so I count on your review asap.
>>>>> 
>>>>> cheers,
>>>>> Paris
>>>>> 
>>>>> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <xiaogang@alibaba-inc.com
>>>>> <mailto:xiaogang@alibaba-inc.com>> wrote:
>>>>> 
>>>>> Hi Paris
>>>>> 
>>>>> Unfortunately, the project is not public yet.
>>>>> But i can provide you a primitive implementation of the update protocol
>>> in
>>>>> the paper. It’s implemented in Storm. Since the protocol assumes the
>>>>> communication channels between different tasks are dual, i think it’s
>>> not
>>>>> easy to adapt it to Flink.
>>>>> 
>>>>> Regards
>>>>> Xiaogang
>>>>> 
>>>>> 
>>>>> 在 2016年11月12日,上午3:03,Paris Carbone <par...@kth.se<mailto:par...@kth.se
>>>>> 
>>>>> 写道:
>>>>> 
>>>>> Hi Shi,
>>>>> 
>>>>> Naiad/Timely Dataflow and other projects use global coordination which
>>> is
>>>>> very convenient for asynchronous progress tracking in general but it has
>>>>> some downsides in a production systems that count on in-flight
>>>>> transactional control mechanisms and rollback recovery guarantees. This
>>> is
>>>>> why we generally prefer decentralized approaches (despite their our
>>>>> downsides).
>>>>> 
>>>>> Regarding synchronous/structured iterations, this is a bit off topic and
>>>>> they are a bit of a different story as you already know.
>>>>> We maintain a graph streaming (gelly-streams) library on Flink that you
>>>>> might find interesting [1]. Vasia, another Flink committer is also
>>> working
>>>>> on that among others.
>>>>> You can keep an eye on i

Re: [DISCUSS] FLIP-14: Loops API and Termination

2016-11-17 Thread Paris Carbone
Sounds like a plan!

Can someone grant me access to write in the wiki please?
My username is “senorcarbone”.

Paris

> On 16 Nov 2016, at 14:30, Gyula Fóra <gyula.f...@gmail.com> wrote:
> 
> I am not completely sure whether we should deprecate the old API for 1.2 or
> remove it completely. Personally I am in favor of removing it, I don't
> think it is a huge burden to move to the new one if it makes for a much
> nicer user experience.
> 
> I think you can go ahead add the FLIP to the wiki and open the PR so we can
> start the review if you have it ready anyways.
> 
> Gyula
> 
> Paris Carbone <par...@kth.se> ezt írta (időpont: 2016. nov. 16., Sze,
> 11:55):
> 
>> Thanks for reviewing, Gyula.
>> 
>> One thing that is still up to discussion is whether we should remove
>> completely the old iterations API or simply mark it as deprecated till v2.0.
>> Also, not sure what is the best process now. We have the changes ready.
>> Should I copy the FLIP to the wiki and trigger the PRs or wait for a few
>> more days in case someone has objections?
>> 
>> @Stephan, what is your take on our interpretation of the approach you
>> suggested? Should we proceed or is there anything that you do not find nice?
>> 
>> Paris
>> 
>>> On 15 Nov 2016, at 10:01, Gyula Fóra <gyf...@apache.org> wrote:
>>> 
>>> Hi Paris,
>>> 
>>> I like the proposed changes to the iteration API, this cleans up things
>> in
>>> the Java API without any strict restriction I think (it was never a
>> problem
>>> in the Scala API).
>>> 
>>> The termination algorithm based on the proposed scoped loops seems to be
>>> fairly simple and looks good :)
>>> 
>>> Cheers,
>>> Gyula
>>> 
>>> Paris Carbone <par...@kth.se> ezt írta (időpont: 2016. nov. 14., H,
>> 8:50):
>>> 
>>>> That would be great Shi! Let's take that offline.
>>>> 
>>>> Anyone else interested in the iteration changes? It would be nice to
>>>> incorporate these to v1.2 if possible so I count on your review asap.
>>>> 
>>>> cheers,
>>>> Paris
>>>> 
>>>> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <xiaogang@alibaba-inc.com
>>>> <mailto:xiaogang@alibaba-inc.com>> wrote:
>>>> 
>>>> Hi Paris
>>>> 
>>>> Unfortunately, the project is not public yet.
>>>> But i can provide you a primitive implementation of the update protocol
>> in
>>>> the paper. It’s implemented in Storm. Since the protocol assumes the
>>>> communication channels between different tasks are dual, i think it’s
>> not
>>>> easy to adapt it to Flink.
>>>> 
>>>> Regards
>>>> Xiaogang
>>>> 
>>>> 
>>>> 在 2016年11月12日,上午3:03,Paris Carbone <par...@kth.se<mailto:par...@kth.se
>>>> 
>>>> 写道:
>>>> 
>>>> Hi Shi,
>>>> 
>>>> Naiad/Timely Dataflow and other projects use global coordination which
>> is
>>>> very convenient for asynchronous progress tracking in general but it has
>>>> some downsides in a production systems that count on in-flight
>>>> transactional control mechanisms and rollback recovery guarantees. This
>> is
>>>> why we generally prefer decentralized approaches (despite their our
>>>> downsides).
>>>> 
>>>> Regarding synchronous/structured iterations, this is a bit off topic and
>>>> they are a bit of a different story as you already know.
>>>> We maintain a graph streaming (gelly-streams) library on Flink that you
>>>> might find interesting [1]. Vasia, another Flink committer is also
>> working
>>>> on that among others.
>>>> You can keep an eye on it since we are planning to use this project as a
>>>> showcase for a new way of doing structured and fixpoint iterations on
>>>> streams in the future.
>>>> 
>>>> P.S. many thanks for sharing your publication, it was an interesting
>> read.
>>>> Do you happen to have your source code public? We could most certainly
>> use
>>>> it in an benchmark soon.
>>>> 
>>>> [1] https://github.com/vasia/gelly-streaming
>>>> 
>>>> 
>>>> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaoga...@gmail.com>>> shixiaoga...@gmail.com><mailto:shixiaoga...@gmail.com>> wrote:
>>>>

Re: [DISCUSS] FLIP-14: Loops API and Termination

2016-11-16 Thread Paris Carbone
Thanks for reviewing, Gyula.

One thing that is still up to discussion is whether we should remove completely 
the old iterations API or simply mark it as deprecated till v2.0.
Also, not sure what is the best process now. We have the changes ready. Should 
I copy the FLIP to the wiki and trigger the PRs or wait for a few more days in 
case someone has objections?

@Stephan, what is your take on our interpretation of the approach you 
suggested? Should we proceed or is there anything that you do not find nice?

Paris

> On 15 Nov 2016, at 10:01, Gyula Fóra <gyf...@apache.org> wrote:
> 
> Hi Paris,
> 
> I like the proposed changes to the iteration API, this cleans up things in
> the Java API without any strict restriction I think (it was never a problem
> in the Scala API).
> 
> The termination algorithm based on the proposed scoped loops seems to be
> fairly simple and looks good :)
> 
> Cheers,
> Gyula
> 
> Paris Carbone <par...@kth.se> ezt írta (időpont: 2016. nov. 14., H, 8:50):
> 
>> That would be great Shi! Let's take that offline.
>> 
>> Anyone else interested in the iteration changes? It would be nice to
>> incorporate these to v1.2 if possible so I count on your review asap.
>> 
>> cheers,
>> Paris
>> 
>> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <xiaogang@alibaba-inc.com
>> <mailto:xiaogang@alibaba-inc.com>> wrote:
>> 
>> Hi Paris
>> 
>> Unfortunately, the project is not public yet.
>> But i can provide you a primitive implementation of the update protocol in
>> the paper. It’s implemented in Storm. Since the protocol assumes the
>> communication channels between different tasks are dual, i think it’s not
>> easy to adapt it to Flink.
>> 
>> Regards
>> Xiaogang
>> 
>> 
>> 在 2016年11月12日,上午3:03,Paris Carbone <par...@kth.se<mailto:par...@kth.se>>
>> 写道:
>> 
>> Hi Shi,
>> 
>> Naiad/Timely Dataflow and other projects use global coordination which is
>> very convenient for asynchronous progress tracking in general but it has
>> some downsides in a production systems that count on in-flight
>> transactional control mechanisms and rollback recovery guarantees. This is
>> why we generally prefer decentralized approaches (despite their our
>> downsides).
>> 
>> Regarding synchronous/structured iterations, this is a bit off topic and
>> they are a bit of a different story as you already know.
>> We maintain a graph streaming (gelly-streams) library on Flink that you
>> might find interesting [1]. Vasia, another Flink committer is also working
>> on that among others.
>> You can keep an eye on it since we are planning to use this project as a
>> showcase for a new way of doing structured and fixpoint iterations on
>> streams in the future.
>> 
>> P.S. many thanks for sharing your publication, it was an interesting read.
>> Do you happen to have your source code public? We could most certainly use
>> it in an benchmark soon.
>> 
>> [1] https://github.com/vasia/gelly-streaming
>> 
>> 
>> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaoga...@gmail.com> shixiaoga...@gmail.com><mailto:shixiaoga...@gmail.com>> wrote:
>> 
>> Hi, Fouad
>> 
>> Thank you for the explanation. Now the centralized method seems correct to
>> me.
>> The passing of StatusUpdate events will lead to synchronous iterations and
>> we are using the information in each iterations to terminate the
>> computation.
>> 
>> Actually, i prefer the centralized method because in many applications, the
>> convergence may depend on some global statistics.
>> For example, a PageRank program may terminate the computation when 99%
>> vertices are converged.
>> I think those learning programs which cannot reach the fixed-point
>> (oscillating around the fixed-point) can benefit a lot from such features.
>> The decentralized method makes it hard to support such convergence
>> conditions.
>> 
>> 
>> Another concern is that Flink cannot produce periodical results in the
>> iteration over infinite data streams.
>> Take a concrete example. Given an edge stream constructing a graph, the
>> user may need the PageRank weight of each vertex in the graphs formed at
>> certain instants.
>> Currently Flink does not provide any input or iteration information to
>> users, making users hard to implement such real-time iterative
>> applications.
>> Such features are supported in both Naiad and Tornado. I think Flink should
>> support it as well.
>> 
>> What do you think?
>&

Re: [DISCUSS] FLIP-14: Loops API and Termination

2016-11-13 Thread Paris Carbone
That would be great Shi! Let's take that offline.

Anyone else interested in the iteration changes? It would be nice to 
incorporate these to v1.2 if possible so I count on your review asap.

cheers,
Paris

On Nov 14, 2016, at 2:59 AM, xiaogang.sxg 
<xiaogang@alibaba-inc.com<mailto:xiaogang@alibaba-inc.com>> wrote:

Hi Paris

Unfortunately, the project is not public yet.
But i can provide you a primitive implementation of the update protocol in the 
paper. It’s implemented in Storm. Since the protocol assumes the communication 
channels between different tasks are dual, i think it’s not easy to adapt it to 
Flink.

Regards
Xiaogang


在 2016年11月12日,上午3:03,Paris Carbone <par...@kth.se<mailto:par...@kth.se>> 写道:

Hi Shi,

Naiad/Timely Dataflow and other projects use global coordination which is very 
convenient for asynchronous progress tracking in general but it has some 
downsides in a production systems that count on in-flight transactional control 
mechanisms and rollback recovery guarantees. This is why we generally prefer 
decentralized approaches (despite their our downsides).

Regarding synchronous/structured iterations, this is a bit off topic and they 
are a bit of a different story as you already know.
We maintain a graph streaming (gelly-streams) library on Flink that you might 
find interesting [1]. Vasia, another Flink committer is also working on that 
among others.
You can keep an eye on it since we are planning to use this project as a 
showcase for a new way of doing structured and fixpoint iterations on streams 
in the future.

P.S. many thanks for sharing your publication, it was an interesting read. Do 
you happen to have your source code public? We could most certainly use it in 
an benchmark soon.

[1] https://github.com/vasia/gelly-streaming


On 11 Nov 2016, at 19:18, SHI Xiaogang 
<shixiaoga...@gmail.com<mailto:shixiaoga...@gmail.com><mailto:shixiaoga...@gmail.com>>
 wrote:

Hi, Fouad

Thank you for the explanation. Now the centralized method seems correct to
me.
The passing of StatusUpdate events will lead to synchronous iterations and
we are using the information in each iterations to terminate the
computation.

Actually, i prefer the centralized method because in many applications, the
convergence may depend on some global statistics.
For example, a PageRank program may terminate the computation when 99%
vertices are converged.
I think those learning programs which cannot reach the fixed-point
(oscillating around the fixed-point) can benefit a lot from such features.
The decentralized method makes it hard to support such convergence
conditions.


Another concern is that Flink cannot produce periodical results in the
iteration over infinite data streams.
Take a concrete example. Given an edge stream constructing a graph, the
user may need the PageRank weight of each vertex in the graphs formed at
certain instants.
Currently Flink does not provide any input or iteration information to
users, making users hard to implement such real-time iterative applications.
Such features are supported in both Naiad and Tornado. I think Flink should
support it as well.

What do you think?

Regards
Xiaogang


2016-11-11 19:27 GMT+08:00 Fouad ALi 
<fouad.alsay...@gmail.com<mailto:fouad.alsay...@gmail.com><mailto:fouad.alsay...@gmail.com>>:

Hi Shi,

It seems that you are referring to the centralized algorithm which is no
longer the proposed version.
In the decentralized version (check last doc) there is no master node or
global coordination involved.

Let us keep this discussion to the decentralized one if possible.

To answer your points on the previous approach, there is a catch in your
trace at t7. Here is what is happening :
- Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
runtime (see 2.1 in the steps).
- RS and Heads will broadcast StatusUpdate  event and will not notify its
status.
- When StatusUpdate event gets back to the head it will notify its
WORKING  status.

Hope that answers your concern.

Best,
Fouad

On Nov 11, 2016, at 6:21 AM, SHI Xiaogang 
<shixiaoga...@gmail.com<mailto:shixiaoga...@gmail.com><mailto:shixiaoga...@gmail.com>>
wrote:

Hi Paris

I have several concerns about the correctness of the termination
protocol.
I think the termination protocol put an end to the computation even when
the computation has not converged.

Suppose there exists a loop context constructed by a OP operator, a Head
operator and a Tail operator (illustrated in Figure 2 in the first
draft).
The stream only contains one record. OP will pass the record to its
downstream operators 10 times. In other words, the loop should iterate 10
times.

If I understood the protocol correctly, the following event sequence may
happen in the computation:
t1:  RS emits Record to OP. Since RS has reached the "end-of-stream", the
system enters into Speculative Phase.
t2:  OP receives Record and emits it to 

Re: [DISCUSS] FLIP-14: Loops API and Termination

2016-11-11 Thread Paris Carbone
synchronized.

I think the protocol follows the idea of the Chandy-Lamport algorithm to
determine a global state.
But the information of whether a node has processed any record to since
the
last request is not STABLE.
Hence i doubt the correctness of the protocol.

To determine the termination correctly, we need some information that is
stable.
In timelyflow, Naiad collects the progress made in each iteration and
terminates the loop when a little progress is made in an iteration
(identified by the timestamp vector).
The information is stable because the result of an iteration cannot be
changed by the execution of later iterations.

A similar method is also adopted in Tornado.
You may see my paper for more details about the termination of loops:
http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>

Regards
Xiaogang

2016-11-11 3:19 GMT+08:00 Paris Carbone <par...@kth.se<mailto:par...@kth.se> 
mailto:par...@kth.se>>>:

Hi again Flink folks,

Here is our new proposal that addresses Job Termination - the loop fault
tolerance proposal will follow shortly.
As Stephan hinted, we need operators to be aware of their scope level.

Thus, it is time we make loops great again! :)

Part of this FLIP basically introduces a new functional, compositional
API
for defining asynchronous loops for DataStreams.
This is coupled with a decentralized algorithm for job termination with
loops - along the lines of what Stephan described.
We are already working on the actual prototypes as you can observe in
the
links of the doc.

Please let us know if you like (or don't like) it and why, in this mail
discussion.

https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
PfTHtq3173EhsAkpBoQ

cheers
Paris and Fouad

On 31 Oct 2016, at 12:53, Paris Carbone <par...@kth.se mailto:par...@kth.se>><mailto:parisc@
kth.se<http://kth.se> <http://kth.se/>>> wrote:

Hey Stephan,

Thanks for looking into it!

+1 for breaking this up, will do that.

I can see your point and maybe it makes sense to introduce part of
scoping
to incorporate support for nested loops (otherwise it can’t work).
Let us think about this a bit. We will share another draft for a more
detail description of the approach you are suggesting asap.


On 27 Oct 2016, at 10:55, Stephan Ewen 
<se...@apache.org<mailto:se...@apache.org> mailto:se...@apache.org>><mailto:sewen
@apache.org>> wrote:

How about we break this up into two FLIPs? There are after all two
orthogonal problems (termination, fault tolerance) with quite different
discussion states.

Concerning fault tolerance, I like the ideas.
For the termination proposal, I would like to iterate a bit more.

*Termination algorithm:*

My main concern here is the introduction of a termination coordinator
and
any involvement of RPC messages when deciding termination.
That would be such a fundamental break with the current runtime
architecture, and it would make the currently very elegant and simple
model
much more complicated and harder to maintain. Given that Flink's
runtime is
complex enough, I would really like to avoid that.

The current runtime paradigm coordinates between operators strictly via
in-band events. RPC calls happen between operators and the master for
triggering and acknowledging execution and checkpoints.

I was wondering whether we can keep following that paradigm and still
get
most of what you are proposing here. In some sense, all we need to do is
replace RPC calls with in-band events, and "decentralize" the
coordinator
such that every operator can make its own termination decision by
itself.

This is only a rough sketch, you probably need to flesh it out more.

- I assume that the OP in the diagram knows that it is in a loop and
that
it is the one connected to the head and tail

- When OP receives and EndOfStream Event from the regular source (RS),
it
emits an "AttemptTermination" event downstream to the operators
involved in
the loop. It attaches an attempt sequence number and memorizes that
- Tail and Head forward these events
- When OP receives the event back with the same attempt sequence number,
and no records came in the meantime, it shuts down and emits EndOfStream
downstream
- When other records came back between emitting the AttemptTermination
event and receiving it back, then it emits a new AttemptTermination
event
with the next sequence number.
- This should terminate as soon as the loop is empty.

Might this model even generalize to nested loops, where the
"AttemptTermination" event is scoped by the loop's nesting level?

Let me know what you think!


Best,
Stephan


On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen 
<se...@apache.org<mailto:se...@apache.org>
<mailto:se...@apache.org>mailto:se...@apache.org> <mailto:se...@apache.org>>> wrote:

Hi!

I am still scanning it and compiling some comments. Give me 

[DISCUSS] FLIP-14: Loops API and Termination

2016-11-10 Thread Paris Carbone
Hi again Flink folks,

Here is our new proposal that addresses Job Termination - the loop fault 
tolerance proposal will follow shortly.
As Stephan hinted, we need operators to be aware of their scope level.

Thus, it is time we make loops great again! :)

Part of this FLIP basically introduces a new functional, compositional API for 
defining asynchronous loops for DataStreams.
This is coupled with a decentralized algorithm for job termination with loops - 
along the lines of what Stephan described.
We are already working on the actual prototypes as you can observe in the links 
of the doc.

Please let us know if you like (or don't like) it and why, in this mail 
discussion.

https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-PfTHtq3173EhsAkpBoQ

cheers
Paris and Fouad

On 31 Oct 2016, at 12:53, Paris Carbone <par...@kth.se<mailto:par...@kth.se>> 
wrote:

Hey Stephan,

Thanks for looking into it!

+1 for breaking this up, will do that.

I can see your point and maybe it makes sense to introduce part of scoping to 
incorporate support for nested loops (otherwise it can’t work).
Let us think about this a bit. We will share another draft for a more detail 
description of the approach you are suggesting asap.


On 27 Oct 2016, at 10:55, Stephan Ewen 
<se...@apache.org<mailto:se...@apache.org>> wrote:

How about we break this up into two FLIPs? There are after all two
orthogonal problems (termination, fault tolerance) with quite different
discussion states.

Concerning fault tolerance, I like the ideas.
For the termination proposal, I would like to iterate a bit more.

*Termination algorithm:*

My main concern here is the introduction of a termination coordinator and
any involvement of RPC messages when deciding termination.
That would be such a fundamental break with the current runtime
architecture, and it would make the currently very elegant and simple model
much more complicated and harder to maintain. Given that Flink's runtime is
complex enough, I would really like to avoid that.

The current runtime paradigm coordinates between operators strictly via
in-band events. RPC calls happen between operators and the master for
triggering and acknowledging execution and checkpoints.

I was wondering whether we can keep following that paradigm and still get
most of what you are proposing here. In some sense, all we need to do is
replace RPC calls with in-band events, and "decentralize" the coordinator
such that every operator can make its own termination decision by itself.

This is only a rough sketch, you probably need to flesh it out more.

- I assume that the OP in the diagram knows that it is in a loop and that
it is the one connected to the head and tail

- When OP receives and EndOfStream Event from the regular source (RS), it
emits an "AttemptTermination" event downstream to the operators involved in
the loop. It attaches an attempt sequence number and memorizes that
- Tail and Head forward these events
- When OP receives the event back with the same attempt sequence number,
and no records came in the meantime, it shuts down and emits EndOfStream
downstream
- When other records came back between emitting the AttemptTermination
event and receiving it back, then it emits a new AttemptTermination event
with the next sequence number.
- This should terminate as soon as the loop is empty.

Might this model even generalize to nested loops, where the
"AttemptTermination" event is scoped by the loop's nesting level?

Let me know what you think!


Best,
Stephan


On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen 
<se...@apache.org<mailto:se...@apache.org>> wrote:

Hi!

I am still scanning it and compiling some comments. Give me a bit ;-)

Stephan


On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone 
<par...@kth.se<mailto:par...@kth.se>> wrote:

Hey all,

Now that many of you have already scanned the document (judging from the
views) maybe it is time to give back some feedback!
Did you like it? Would you suggest an improvement?

I would suggest not to leave this in the void. It has to do with
important properties that the system promises to provide.
Me and Fouad will do our best to answer your questions and discuss this
further.

cheers
Paris

On 21 Oct 2016, at 08:54, Paris Carbone 
<par...@kth.se<mailto:par...@kth.se><mailto:parisc@k
th.se<http://th.se>>> wrote:

Hello everyone,

Loops in Apache Flink have a good potential to become a much more
powerful thing in future version of Apache Flink.
There is generally high demand to make them usable and first of all
production-ready for upcoming releases.

As a first commitment we would like to propose FLIP-13 for consistent
processing with Loops.
We are also working on scoped loops for Q1 2017 which we can share if
there is enough interest.

For now, that is an improvement proposal that solves two pending major
issues:

1) The (not so trivial) problem of correc

Re: [DISCUSS] FLIP-13: Consistent Processing with Loops

2016-10-31 Thread Paris Carbone
Hey Stephan,

Thanks for looking into it!

+1 for breaking this up, will do that.

I can see your point and maybe it makes sense to introduce part of scoping to 
incorporate support for nested loops (otherwise it can’t work).
Let us think about this a bit. We will share another draft for a more detail 
description of the approach you are suggesting asap.


> On 27 Oct 2016, at 10:55, Stephan Ewen <se...@apache.org> wrote:
> 
> How about we break this up into two FLIPs? There are after all two
> orthogonal problems (termination, fault tolerance) with quite different
> discussion states.
> 
> Concerning fault tolerance, I like the ideas.
> For the termination proposal, I would like to iterate a bit more.
> 
> *Termination algorithm:*
> 
> My main concern here is the introduction of a termination coordinator and
> any involvement of RPC messages when deciding termination.
> That would be such a fundamental break with the current runtime
> architecture, and it would make the currently very elegant and simple model
> much more complicated and harder to maintain. Given that Flink's runtime is
> complex enough, I would really like to avoid that.
> 
> The current runtime paradigm coordinates between operators strictly via
> in-band events. RPC calls happen between operators and the master for
> triggering and acknowledging execution and checkpoints.
> 
> I was wondering whether we can keep following that paradigm and still get
> most of what you are proposing here. In some sense, all we need to do is
> replace RPC calls with in-band events, and "decentralize" the coordinator
> such that every operator can make its own termination decision by itself.
> 
> This is only a rough sketch, you probably need to flesh it out more.
> 
>  - I assume that the OP in the diagram knows that it is in a loop and that
> it is the one connected to the head and tail
> 
>  - When OP receives and EndOfStream Event from the regular source (RS), it
> emits an "AttemptTermination" event downstream to the operators involved in
> the loop. It attaches an attempt sequence number and memorizes that
>  - Tail and Head forward these events
>  - When OP receives the event back with the same attempt sequence number,
> and no records came in the meantime, it shuts down and emits EndOfStream
> downstream
>  - When other records came back between emitting the AttemptTermination
> event and receiving it back, then it emits a new AttemptTermination event
> with the next sequence number.
>  - This should terminate as soon as the loop is empty.
> 
> Might this model even generalize to nested loops, where the
> "AttemptTermination" event is scoped by the loop's nesting level?
> 
> Let me know what you think!
> 
> 
> Best,
> Stephan
> 
> 
> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <se...@apache.org> wrote:
> 
>> Hi!
>> 
>> I am still scanning it and compiling some comments. Give me a bit ;-)
>> 
>> Stephan
>> 
>> 
>> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <par...@kth.se> wrote:
>> 
>>> Hey all,
>>> 
>>> Now that many of you have already scanned the document (judging from the
>>> views) maybe it is time to give back some feedback!
>>> Did you like it? Would you suggest an improvement?
>>> 
>>> I would suggest not to leave this in the void. It has to do with
>>> important properties that the system promises to provide.
>>> Me and Fouad will do our best to answer your questions and discuss this
>>> further.
>>> 
>>> cheers
>>> Paris
>>> 
>>> On 21 Oct 2016, at 08:54, Paris Carbone <par...@kth.se<mailto:parisc@k
>>> th.se>> wrote:
>>> 
>>> Hello everyone,
>>> 
>>> Loops in Apache Flink have a good potential to become a much more
>>> powerful thing in future version of Apache Flink.
>>> There is generally high demand to make them usable and first of all
>>> production-ready for upcoming releases.
>>> 
>>> As a first commitment we would like to propose FLIP-13 for consistent
>>> processing with Loops.
>>> We are also working on scoped loops for Q1 2017 which we can share if
>>> there is enough interest.
>>> 
>>> For now, that is an improvement proposal that solves two pending major
>>> issues:
>>> 
>>> 1) The (not so trivial) problem of correct termination of jobs with
>>> iterations
>>> 2) The applicability of the checkpointing algorithm to iterative dataflow
>>> graphs.
>>> 
>>> We would really appreciate it if you go through the linked draft
>>> (motivation and proposed changes) for FLIP-13 and point out comments,
>>> preferably publicly in this devlist discussion before we go ahead and
>>> update the wiki.
>>> 
>>> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
>>> BhDbtoYucmByBjRBISs/edit?usp=sharing
>>> 
>>> cheers
>>> 
>>> Paris and Fouad
>>> 
>>> 
>>> 
>> 



Re: [DISCUSS] FLIP-13: Consistent Processing with Loops

2016-10-26 Thread Paris Carbone
Hey all,

Now that many of you have already scanned the document (judging from the views) 
maybe it is time to give back some feedback!
Did you like it? Would you suggest an improvement?

I would suggest not to leave this in the void. It has to do with important 
properties that the system promises to provide.
Me and Fouad will do our best to answer your questions and discuss this further.

cheers
Paris

On 21 Oct 2016, at 08:54, Paris Carbone <par...@kth.se<mailto:par...@kth.se>> 
wrote:

Hello everyone,

Loops in Apache Flink have a good potential to become a much more powerful 
thing in future version of Apache Flink.
There is generally high demand to make them usable and first of all 
production-ready for upcoming releases.

As a first commitment we would like to propose FLIP-13 for consistent 
processing with Loops.
We are also working on scoped loops for Q1 2017 which we can share if there is 
enough interest.

For now, that is an improvement proposal that solves two pending major issues:

1) The (not so trivial) problem of correct termination of jobs with iterations
2) The applicability of the checkpointing algorithm to iterative dataflow 
graphs.

We would really appreciate it if you go through the linked draft (motivation 
and proposed changes) for FLIP-13 and point out comments, preferably publicly 
in this devlist discussion before we go ahead and update the wiki.

https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0BhDbtoYucmByBjRBISs/edit?usp=sharing

cheers

Paris and Fouad




[DISCUSS] FLIP-13: Consistent Processing with Loops

2016-10-21 Thread Paris Carbone
Hello everyone,

Loops in Apache Flink have a good potential to become a much more powerful 
thing in future version of Apache Flink.
There is generally high demand to make them usable and first of all 
production-ready for upcoming releases.

As a first commitment we would like to propose FLIP-13 for consistent 
processing with Loops.
We are also working on scoped loops for Q1 2017 which we can share if there is 
enough interest.

For now, that is an improvement proposal that solves two pending major issues:

1) The (not so trivial) problem of correct termination of jobs with iterations
2) The applicability of the checkpointing algorithm to iterative dataflow 
graphs.

We would really appreciate it if you go through the linked draft (motivation 
and proposed changes) for FLIP-13 and point out comments, preferably publicly 
in this devlist discussion before we go ahead and update the wiki.

https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0BhDbtoYucmByBjRBISs/edit?usp=sharing

cheers

Paris and Fouad



Re: [DISCUSS] FLIP-1 : Fine grained recovery from task failures

2016-07-15 Thread Paris Carbone
+1 

I like the proposal!

Docs can work as supplementary medium to the wiki sometimes, in my view. 
The wiki is definitely much preferred for tracking/archiving purposes but it 
might not hurt if we sometimes also reference a doc with a somewhat evolving 
prototype there.

> On 15 Jul 2016, at 15:37, Stephan Ewen  wrote:
> 
> So far the reactions were positive towards the proposal.
> If no one objects, I am going to assume lazy consensus and mark this FLIP
> as adopted.
> 
> For the process, we'll post future FLIPs in the wiki directly, rather than
> in a Google Doc first.
> 
> On Wed, Jul 13, 2016 at 5:17 PM, Aljoscha Krettek 
> wrote:
> 
>> +1
>> This was my original motivation. I saw a lot of discussion on the design
>> docs I initiated but none of this was reflected on the ML.
>> 
>> On Wed, 13 Jul 2016 at 17:11 Robert Metzger  wrote:
>> 
>>> Thank you for writing the proposal!
>>> 
>>> I agree with Chesnay. I thought one of the main motivations of the new
>> FLIP
>>> process is to move the discussion onto ASF infra, to record the decision
>>> making process on the mailing list and to make ongoing activities more
>>> visible to the community.
>>> 
>>> If we want to use google docs because of the commenting features, we
>> should
>>> consider implementing a tool mirroring the comments onto the mailing
>> lists.
>>> 
>>> If you want to change the FLIP process, I would suggest to move the
>>> discussion about Google Docs to the FLIP 1 thread.
>>> 
>>> 
>>> 
>>> On Wed, Jul 13, 2016 at 2:07 PM, Stephan Ewen  wrote:
>>> 
 I deliberately kept it initially in the Google Doc because it is easier
>>> to
 comment.
 
 I would move it to the wiki once we have passed the initial discussion
>>> and
 have converged (or are close to convergence) on the design.
 
 On Tue, Jul 12, 2016 at 8:11 PM, Chesnay Schepler 
 wrote:
 
> shouldn't the proposal be contained in the wiki instead of
>> GoogleDocs?
> 
> 
> On 12.07.2016 19:55, Stephan Ewen wrote:
> 
>> Hi all!
>> 
>> Here is the very first FLIP (FLink Improvement Proposal): Fine
>> grained
>> recovery from task failures
>> 
>> It describes a proposed enhancement for reducing the work done
>> during
>> recovery.
>> 
>> 
>> 
 
>>> 
>> https://docs.google.com/document/d/16S584XFzkfFu3MOfVCE0rHZ_JJgQrQuw9SXpanoMiMo
>> 
>> Please comment in this mail thread, or in the GoogleDoc.
>> 
>> Best,
>> Stephan
>> 
>> 
> 
 
>>> 
>> 



Re: sampling function

2016-07-12 Thread Paris Carbone
Hey Do,

I think that more sophisticated samplers could make a better fit in the ML 
library and not in the core API but I am not very familiar with the milestones 
there.
Maybe the maintainers of the batch ML library could check if sampling 
techniques could be useful there I guess.

Paris

> On 11 Jul 2016, at 16:15, Le Quoc Do  wrote:
> 
> Hi all,
> 
> Thank you all for your answers.
> By the way, I also recognized that Flink doesn't support  "stratified
> sampling" function (only simple random sampling) for DataSet.
> It would be nice if someone can create a Jira for it, and assign the task
> to me so that I can work for it.
> 
> Thank you,
> Do
> 
> On Mon, Jul 11, 2016 at 11:44 AM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
> 
>> Hi Do,
>> 
>> Paris and Martha worked on sampling techniques for data streams on Flink
>> last year. If you want to implement your own samplers, you might find
>> Martha's master thesis helpful [1].
>> 
>> -Vasia.
>> 
>> [1]: http://kth.diva-portal.org/smash/get/diva2:910695/FULLTEXT01.pdf
>> 
>> On 11 July 2016 at 11:31, Kostas Kloudas 
>> wrote:
>> 
>>> Hi Do,
>>> 
>>> In DataStream you can always implement your own
>>> sampling function, hopefully without too much effort.
>>> 
>>> Adding such functionality it to the API could be a good idea.
>>> But given that in sampling there is no “one-size-fits-all”
>>> solution (as not every use case needs random sampling and not
>>> all random samplers fit to all workloads), I am not sure if we
>>> should start adding different sampling operators.
>>> 
>>> Thanks,
>>> Kostas
>>> 
 On Jul 9, 2016, at 5:43 PM, Greg Hogan  wrote:
 
 Hi Do,
 
 DataSet provides a stable @Public interface. DataSetUtils is marked
 @PublicEvolving which is intended for public use, has stable behavior,
>>> but
 method signatures may change. It's also good to limit DataSet to common
 methods whereas the utility methods tend to be used for specific
 applications.
 
 I don't have the pulse of streaming but this sounds like a useful
>> feature
 that could be added.
 
 Greg
 
 On Sat, Jul 9, 2016 at 10:47 AM, Le Quoc Do 
>> wrote:
 
> Hi all,
> 
> I'm working on approximate computing using sampling techniques. I
> recognized that Flink supports the sample function for Dataset
> (org/apache/flink/api/java/utils/DataSetUtils.java). I'm just
>> wondering
>>> why
> you didn't merge the function to
>> org/apache/flink/api/java/DataSet.java
> since the sample function works as a transformation operator?
> 
> The second question is that are you planning to support the sample
> function for DataStream (within windows) since I did not see it in
> DataStream code ?
> 
> Thank you,
> Do
> 
>>> 
>>> 
>> 



Re: FYI: Updated Slides Section

2016-04-04 Thread Paris Carbone
Some people might find my slides on the FT fundamentals from last summer 
interesting. If you like it feel free to include it.

http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha

Paris

On 04 Apr 2016, at 11:33, Ufuk Celebi > 
wrote:

Dear Flink community,

I have updated the Material section on the Flink project page and
moved the slides section to a separate page.

You can find links to slides and talks here now:
http://flink.apache.org/slides.html

I've added slides for talks from this year by Till Rohrmann, Vasia
Kalavri, Robert Metzger, Jamie Girer and Kostas Tzoumas. If you think
that something is missing, feel free to ping in this thread.

– Ufuk



Re: [ANNOUNCE] Chengxiang Li added as committer

2016-01-19 Thread Paris Carbone
Congrats Chengxiang! Really pleased to have you on board

> On 19 Jan 2016, at 13:16, Matthias J. Sax  wrote:
> 
> Congrats and welcome Chengxiang!! :)
> 
> On 01/19/2016 12:56 PM, Kostas Tzoumas wrote:
>> Welcome Chengxiang!!
>> 
>> On Tue, Jan 19, 2016 at 12:31 PM, Stephan Ewen  wrote:
>> 
>>> Good to have you on board!
>>> 
>>> On Tue, Jan 19, 2016 at 11:29 AM, Maximilian Michels 
>>> wrote:
>>> 
 Pleased to have you with us Chengxiang!
 
 Cheers,
 Max
 
 On Tue, Jan 19, 2016 at 11:13 AM, Chiwan Park 
 wrote:
> Congrats! Welcome Chengxiang Li!
> 
>> On Jan 19, 2016, at 7:13 PM, Vasiliki Kalavri <
 vasilikikala...@gmail.com> wrote:
>> 
>> Congratulations! Welcome Chengxiang Li!
>> 
>> On 19 January 2016 at 11:02, Fabian Hueske  wrote:
>> 
>>> Hi everybody,
>>> 
>>> I'd like to announce that Chengxiang Li accepted the PMC's offer to
 become
>>> a committer of the Apache Flink project.
>>> 
>>> Please join me in welcoming Chengxiang Li!
>>> 
>>> Best, Fabian
>>> 
> 
> Regards,
> Chiwan Park
> 
 
>>> 
>> 
> 



[jira] [Created] (FLINK-3256) Invalid execution graph cleanup for jobs with colocation groups

2016-01-18 Thread Paris Carbone (JIRA)
Paris Carbone created FLINK-3256:


 Summary: Invalid execution graph cleanup for jobs with colocation 
groups
 Key: FLINK-3256
 URL: https://issues.apache.org/jira/browse/FLINK-3256
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Reporter: Paris Carbone
Assignee: Paris Carbone
Priority: Blocker


Currently, upon restarting an execution graph, we clean-up the colocation 
constraints for each group present in an ExecutionJobVertex respectively.

This can lead to invalid reconfiguration upon a restart or any other activity 
that relies on state cleanup of the execution graph. For example, upon 
restarting a DataStream job with iterations the following steps are executed:

1) IterationSource colocation group constraints are reset
2) New IterationSource colocation group constraints are generated
3) IterationSource subtasks are scheduled with current colocation constraints
4) IterationSink colocation group constraints are reset
5) New IterationSink colocation group constraints are generated
6) IterationSink subtasks are scheduled with different colocation constraints, 
thus, not being colocated with sources while also demanding more slots from the 
scheduler.

This can be trivially fixed by reseting colocation groups independently from 
ExecutionJobVertices, thus, updating them once per reconfiguration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-01-18 Thread Paris Carbone (JIRA)
Paris Carbone created FLINK-3257:


 Summary: Add Exactly-Once Processing Guarantees in Iterative 
DataStream Jobs
 Key: FLINK-3257
 URL: https://issues.apache.org/jira/browse/FLINK-3257
 Project: Flink
  Issue Type: Improvement
Reporter: Paris Carbone
Assignee: Paris Carbone


The current snapshotting algorithm cannot support cycles in the execution 
graph. An alternative scheme can potentially include records in-transit through 
the back-edges of a cyclic execution graph (ABS [1]) to achieve the same 
guarantees.

One straightforward implementation of ABS for cyclic graphs can work as follows 
along the lines:

1) Upon triggering a barrier in an IterationHead from the TaskManager start 
block output and start upstream backup of all records forwarded from the 
respective IterationSink.

2) The IterationSink should eventually forward the current snapshotting epoch 
barrier to the IterationSource.

3) Upon receiving a barrier from the IterationSink, the IterationSource should 
finalize the snapshot, unblock its output and emit all records in-transit in 
FIFO order and continue the usual execution.
--
Upon restart the IterationSource should emit all records from the injected 
snapshot first and then continue its usual execution.

Several optimisations and slight variations can be potentially achieved but 
this can be the initial implementation take.

[1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Add CEP library to Flink

2016-01-10 Thread Paris Carbone
+1 for the cool design proposal. I also agree with Stephan’s point to focus on 
the Pattern operator.
Ultimately in the future this could be merged into the SQL lib.  There are a 
few “standards" you could check out such as Oracle’s Pattern Matching extension 
on SQL [1], apart from EPL.

[1] https://docs.oracle.com/database/121/DWHSG/pattern.htm#DWHSG8956



On 10 Jan 2016, at 02:46, Henry Saputra 
> wrote:

HI Till,

Have you created JIRA ticket to keep track of this proposed new feature?

We should create one to keep track updates on the effort.

Thanks,

Henry

On Fri, Jan 8, 2016 at 6:54 AM, Till Rohrmann 
> wrote:
Hi everybody,

recently we've seen an increased interest in complex event processing (CEP)
by Flink users. Even though most functionality is already there to solve
many use cases it would still be helpful for most users to have an easy to
use library. Having such a library which allows to define complex event
patterns would increase Flink's user range to the CEP community. Once
having laid the foundation, I'm optimistic that people will quickly pick it
up and further extend it.

The major contribution of this library would be to add an efficient
non-deterministic finite automaton which can detect complex event patterns.
For everything else, Flink already has most of the functionality in place.

I've drafted a design document for the first version. Please review it and
comment:

https://docs.google.com/document/d/15iaBCZkNcpqSma_qrF0GUyobKV_JttEDVuhNd0Y1aAU/edit?usp=sharing

Thanks,
Till



Re: Incremental checkpoints for Flink

2015-12-21 Thread Paris Carbone
Hi Marius,

This is a pretty good and quite challenging topic for a thesis! Your thoughts 
are pretty much aligned with strategies that we have discussed in the past.
There are some initial steps in the makings towards that direction. For 
example, Aljoscha proposed specialized operator state (KVState) types earlier 
this month (e.g. Maps, Lists) that can potentially support incremental 
snapshotting.  I think that the incremental snapshotting strategy fits pretty 
well with mutable backend storages (e.g. sql databases, kv stores etc). From a 
quick look, option I is close to what most of us have in mind I guess. The 
second option is quite tricky since it is not always possible to define what a 
“diff” is.

If you want to get a more strict overview of how the snapshotting mechanism 
works you can take a look at the arXiv paper [1] we submitted earlier this 
year, but also blogs, docs [2,3] and technical presentations [4] on that 
subject.

In case you decide to go for this topic it will be good to check the 
implementation of the current state coordination and sync with us on what is 
the best way to proceed. The CheckpointCoordinator is a good starting point 
[5]. I would also be happy to co-supervise you or direct you if you want!

cheers
Paris


[1] http://arxiv.org/abs/1506.08603
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html
[3] 
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
[4] 
http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha
[5] 
https://github.com/apache/flink/blob/55fd5f32d7ef0292a01192ab08456fae49b91791/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java


On 19 Dec 2015, at 17:04, Marius Melzer 
> wrote:

Hello everyone,

in order to look for a subject for my diploma thesis, I was at the Flink
Forward conference in October. I talked to Aljoscha and some others
there and after the Fault Tolerance talk on day 2 I arrived at the idea
that an incremental checkpointing of the state of a process when a
barrier arrives would probably be a nice feature for Flink and a good
topic for my thesis. This would espescially be interesting for very
large, e.g. key-value based, states that are necessary for scenarios
like decentralised material views ([1], [2]). Independently, Asterios
from the TU Berlin suggested to me the same topic when I met him. This
is nothing new, e.g. Apache Samza does incremental backup of internal
state as well by writing every change to a special Kafka topic from
which it can be restored when something fails. The approach for Flink
would rather be an adaption of the current checkpointing mechanism.

So my main questions are:
* would incremental checkpoints be an appreciated change and do you
think it would fit a diploma thesis by the effort that's necessary?
* is there already someone working in this area?

I already put some initial thoughts into how it might be possible to
achieve the goal:


How to checkpoint:
(I) Memorize which changes have been made after last checkpoint
 - Pro: Lightweight solution, since only the things that changed need
to be compressed and transfered
 - Contra: You would want to support this not only for each "state
variable" but also inside them, e.g. for lists, key-value structures,
everything. Unfortunately there doesn't seem to be the possibility to
observe changes made on plain java collections or objects in general (or
is there?). So you would need to use a different collection library or a
wrapper around the existing java standard ones.
 - I could imagine the checkpointing somehow like this:
   (1) The programmer of the transformation (with state) uses for the
OperatorState a (wrapped) collection/other type that implements a
certain interface (e.g. "IncrementallyCheckpointed") that demands
something like a changesSinceLastCheckpoint() function
   (2) The flink runtime would check if the state is implementing
IncrementallyCheckpointed and if yes, calls the
changesSinceLastCheckpoint() function.
   (3) There would be the need to differentiate between "regular/full"
checkpoints of a state and "incremental" ones when transferring the
checkpoint to the backup/checkpoint server.

(II) Keep last state and make a diff (preferably with the already
serialised checkpoint):
 - Pro: Much easier solution, doesn't need wrapping or adapting of
collections or other types, very general approach, the transferred data
shouldn't be more than in case (I) - maybe in some cases even less
 - Contra: Would usually almost double the memory needs of the
transformation, for large collections this would also mean quite some
processing effort for computing the diff

(III?) Is there another kind of approach you could imagine?

Which parts need change:
 - The checkpointing of the transformation state (but not the restoring
of the state, this stays the 

Re: [DISCUSS] Improving State/Timers/Windows

2015-12-14 Thread Paris Carbone
+1 to all changes proposed, that is a reasonable step towards incremental 
snapshots and proper reconfiguration support. What is more interesting though 
is the actual implementations of the KVState derivatives, I am looking forward 
to see what you have in mind there. The operator/UDF KV namespace separation is 
not really a problem. We can just prefix the keys with a specific namespace ID. 
I would also suggest to abstract a memory management layer between KVstates and 
backend snapshots so we can plug different caching strategies.


> On 14 Dec 2015, at 11:14, Kostas Tzoumas  wrote:
> 
> I suppose that they can start as sugar and evolve to a different
> implementation.
> 
> I would +1 the name change to KVState, OperatorState is indeed somewhat
> confusing, and it will only get harder to rename later.
> 
> On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra  wrote:
> 
>> Would the Reducing/Folding states just be some API sugar on top of what we
>> have know (ValueState) or does it have some added functionality (like
>> incremental checkpoints for list states)?
>> 
>> Gyula
>> 
>> Aljoscha Krettek  ezt írta (időpont: 2015. dec. 14.,
>> H, 11:03):
>> 
>>> While enhancing the state interfaces we would also need to introduce new
>>> types of state. I was thinking of these, for a start:
>>> - ValueState (works like OperatorState works now, i.e. provides methods
>>> to get/set one state value
>>> - ListState, proves methods to add one element to a list of elements and
>>> to iterate over all contained elements
>>> - ReducingState, somewhat similar to value state but combines the added
>>> value to the existing value using a ReduceFunction
>>> - FoldingState, same as above but with fold
>>> 
>>> I think these are necessary to give the system more knowledge about the
>>> semantics of state so that it can handle the state more efficiently.
>> Think
>>> of incremental checkpoints, for example, these are easy to do if you know
>>> that state is a list to which stuff is only appended.
 On 14 Dec 2015, at 10:52, Stephan Ewen  wrote:
 
 A lot of this makes sense, but I am not sure about renaming
 "OperatorState". The other name is nicer, but why make users' life hard
 just for a name?
 
 
 On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels 
>>> wrote:
 
> Hi Aljoscha,
> 
> Thanks for the informative technical description.
> 
>> - function state: this is the state that you get when a user function
> implements the Checkpointed interface. it is not partitioned
>> - operator state: This is the state that a StreamOperator can
>> snapshot,
> it is similar to the function state, but for operators. it is not
> partitioned
>> - partitioned state: state that is scoped to the key of the incoming
> element, in Flink, this is (confusingly) called OperatorState and
>>> KvState
> (internally)
> 
> Let's clean that up! Let's rename the OperatorState interface to
>>> KvState.
> 
>> Both stream operators and user functions can have partitioned state,
>>> and
> the namespace is the same, i.e. the state can clash. The partitioned
>>> state
> will stay indefinitely if not manually cleared.
> 
> I suppose operators currently have to take care to use a unique
> identifier for the state such that it doesn't clash with the user
> function. Wouldn't be too hard to introduce a scoping here.
> 
> Your proposal makes sense. It seems like this is a rather delicate
> change which improves the flexibility of the streaming API. What is
> the motivation behind this? I suppose you are thinking of improvements
> to the session capabilities of the streaming API.
> 
>> If we want to also implement the current WindowOperator on top of
>> these
> generic facilities we need to have a way to scope state not only by
>> key
>>> but
> also by windows (or better, some generic state scope).
> 
> This is currently handled by the WindowOperator itself and would then
> be delegated to the enhanced state interface? Makes sense if we want
> to make use of the new state interface. Again, is it just cleaner or
> does this enable new type of applications?
> 
> Cheers,
> Max
> 
> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <
>> aljos...@apache.org>
> wrote:
>> Hi All,
>> I want to discuss some ideas about improving the
>> primitives/operations
> that Flink offers for user-state, timers and windows and how these
>>> concepts
> can be unified.
>> 
>> It has come up a lot lately that people have very specific
>> requirements
> regarding the state that they keep and it seems necessary to allows
>>> users
> to set their own custom timers (on processing time and watermark time
> (event-time)) to do both expiration of state and implementation 

Re: Iteration feedback partitioning does not work properly

2015-10-08 Thread Paris Carbone
Yes it does break it since it is based on backwards partitioning preservation 
which was the case before Aljischa’s refactoring. I will focus on a 0.10 patch 
for the samoa connector right after the 0.10 release to see how we can do this.

To be honest the whole thing confuses me a bit. From my understanding in the 
discussion so far in order to fix this I will need to repartition the preceding 
operator using an identity mapper before applying its subscription in the flink 
topology. On the other hand this seems to mess up the ordering according to 
Gyula so it will not be a perfect solution unless we finally fix iterations 
properly, correct? I can see the identical problem on Matthia’s Storm port.


On 08 Oct 2015, at 17:19, Gyula Fóra 
> wrote:

I agree that there are many things that needs to be figured out properly for 
iterations, and I am okay with postponing them for the next release if we want 
to get this one out quickly.

The only problem is that this probably breaks the SAMOA connector.

Paris can you confirm this?

Stephan Ewen > ezt írta (időpont: 
2015. okt. 8., Cs, 17:12):
For me as an outsider to the iterations, I would say that both approaches
are in some way tricky with some unexpected behavior.

Parallelism implicitly from the predecessor (input) or the successor (head
task - what happens if there are multiple with different parallelism?) can
confuse in either way.
I have the feeling that what each one perceives as more consistent or
intuitive depends a bit on their mental model of the iterations (given
their prior experience and expectations).

I agree that we should do something there. But given that we are apparently
not really close to knowing what would be best way to go (or agreeing on
it), I would like to not block 0.10 on this (workarounds are available
after all) and take this for the next release with enough time properly
figure this out and discuss it.

The iterations will anyways need some work for the next release to
integrate them with checkpointing and watermarks, so would you agree that
we tackle this then as part of an effort to advance the iteration feature
as a whole?

Greetings,
Stephan



On Thu, Oct 8, 2015 at 4:42 PM, Gyula Fóra 
> wrote:

> The feedback tuples might get rebalanced but the normal input should not.
>
> But still the main problem is the fact that partitioning is not handled
> transparently, and actually does not work when you set the way you expect.
>
> Gyula
>
> Aljoscha Krettek > ezt írta 
> (időpont: 2015. okt. 8.,
> Cs, 16:33):
>
> > Ok, I see your point. But I think there will be problems no matter what
> > parallelism is chosen for the iteration source/sink. If the parallelism
> of
> > the head is chosen then there will be an implicit rebalance from the
> > operation right before the iteration to the iteration head. I think this
> > should break ordering as well, in your case.
> >
> > On Tue, 6 Oct 2015 at 10:39 Gyula Fóra 
> > > wrote:
> >
> > > Hi,
> > >
> > > This is just a workaround, which actually breaks input order from my
> > > source. I think the iteration construction should be reworked to set
> the
> > > parallelism of the source/sink to the parallelism of the head operator
> > (and
> > > validate that all heads have the same parallelism).
> > >
> > > I thought this was the solution that you described with Stephan in some
> > > older discussion before the rewrite.
> > >
> > > Cheers,
> > > Gyula
> > >
> > > Aljoscha Krettek > ezt 
> > > írta (időpont: 2015. okt.
> 6.,
> > > K,
> > > 9:15):
> > >
> > > > Hi,
> > > > I think what you would like to to can be achieved by:
> > > >
> > > > IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate()
> > > > DataStream mapped = it.map(...)
> > > >  it.closeWith(mapped.partitionByHash(someField))
> > > >
> > > > The input is rebalanced to the map inside the iteration as in your
> > > example
> > > > and the feedback should be partitioned by hash.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > >
> > > > On Tue, 6 Oct 2015 at 00:11 Gyula Fóra 
> > > > > wrote:
> > > >
> > > > > Hey,
> > > > >
> > > > > This question is mainly targeted towards Aljoscha but maybe someone
> > can
> > > > > help me out here:
> > > > >
> > > > > I think the way feedback partitioning is handled does not work, let
> > me
> > > > > illustrate with a simple example:
> > > > >
> > > > > IterativeStream it = ... (parallelism 1)
> > > > > DataStream mapped = it.map(...) (parallelism 2)
> > > > > // this does not work as the feedback has parallelism 2 != 1
> > > > > // it.closeWith(mapped.partitionByHash(someField))
> > > > > // so we need rebalance the data
> > > > >
> > 

Re: Build get stuck at BarrierBufferMassiveRandomTest

2015-09-23 Thread Paris Carbone
It hangs for me too at the same test when doing "clean verify"

> On 23 Sep 2015, at 16:09, Stephan Ewen  wrote:
> 
> Okay, will look into this is a bit today...
> 
> On Wed, Sep 23, 2015 at 4:04 PM, Ufuk Celebi  wrote:
> 
>> Same here.
>> 
>>> On 23 Sep 2015, at 13:50, Vasiliki Kalavri 
>> wrote:
>>> 
>>> Hi,
>>> 
>>> It's the latest master I'm trying to build, but it still hangs.
>>> Here's the trace:
>>> 
>>> -
>>> 2015-09-23 13:48:41
>>> Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.75-b04 mixed
>> mode):
>>> 
>>> "Attach Listener" daemon prio=5 tid=0x7faeb984a000 nid=0x3707 waiting
>>> on condition [0x]
>>>  java.lang.Thread.State: RUNNABLE
>>> 
>>> "Service Thread" daemon prio=5 tid=0x7faeb9808000 nid=0x4d03 runnable
>>> [0x]
>>>  java.lang.Thread.State: RUNNABLE
>>> 
>>> "C2 CompilerThread1" daemon prio=5 tid=0x7faebb00e800 nid=0x4b03
>>> waiting on condition [0x]
>>>  java.lang.Thread.State: RUNNABLE
>>> 
>>> "C2 CompilerThread0" daemon prio=5 tid=0x7faebb840800 nid=0x4903
>>> waiting on condition [0x]
>>>  java.lang.Thread.State: RUNNABLE
>>> 
>>> "Signal Dispatcher" daemon prio=5 tid=0x7faeba806800 nid=0x3d0f
>>> runnable [0x]
>>>  java.lang.Thread.State: RUNNABLE
>>> 
>>> "Finalizer" daemon prio=5 tid=0x7faebb836800 nid=0x3303 in
>>> Object.wait() [0x00014eff8000]
>>>  java.lang.Thread.State: WAITING (on object monitor)
>>> at java.lang.Object.wait(Native Method)
>>> - waiting on <0x000138a84858> (a java.lang.ref.ReferenceQueue$Lock)
>>> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
>>> - locked <0x000138a84858> (a java.lang.ref.ReferenceQueue$Lock)
>>> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
>>> at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
>>> 
>>> "Reference Handler" daemon prio=5 tid=0x7faebb004000 nid=0x3103 in
>>> Object.wait() [0x00014eef5000]
>>>  java.lang.Thread.State: WAITING (on object monitor)
>>> at java.lang.Object.wait(Native Method)
>>> - waiting on <0x000138a84470> (a java.lang.ref.Reference$Lock)
>>> at java.lang.Object.wait(Object.java:503)
>>> at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
>>> - locked <0x000138a84470> (a java.lang.ref.Reference$Lock)
>>> 
>>> "main" prio=5 tid=0x7faeb9009800 nid=0xd03 runnable
>> [0x00010f1c]
>>>  java.lang.Thread.State: RUNNABLE
>>> at java.net.PlainSocketImpl.socketAccept(Native Method)
>>> at
>> java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
>>> at java.net.ServerSocket.implAccept(ServerSocket.java:530)
>>> at java.net.ServerSocket.accept(ServerSocket.java:498)
>>> at
>>> 
>> org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.testSocketSinkRetryAccess(SocketClientSinkTest.java:315)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> 
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>>> at
>>> 
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at
>>> 
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>>> at
>>> 
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>>> at
>>> 
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>>> at
>>> 
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>> at
>>> 
>> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>>> at
>>> 
>> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>>> at
>>> 
>> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>>> at
>>> 
>> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>>> at
>>> 
>> 

Re: [ANNOUNCE] Welcome Matthias Sax as new committer

2015-09-02 Thread Paris Carbone
About time ;)
Welcome!

> On 02 Sep 2015, at 08:13, Vasiliki Kalavri  wrote:
> 
> Congratulations Matthias!
>> On Sep 2, 2015 5:42 AM, "Ufuk Celebi"  wrote:
>> 
>> Welcome Matthias! The Storm compatibility is a great addition to Flink :-)
>> 
>> – Ufuk
>> 
>> On Wed, Sep 2, 2015 at 5:24 PM, Márton Balassi 
>> wrote:
>> 
>>> Welcome, Matthias :)
>>> 
>>> On Wed, Sep 2, 2015 at 5:21 PM, Aljoscha Krettek 
>>> wrote:
>>> 
 Welcome, welcome, welcome :D
 
> On Wed, 2 Sep 2015 at 15:53 Fabian Hueske  wrote:
> 
> Welcome on board Matthias!
>> On Sep 2, 2015 15:40, "Maximilian Michels"  wrote:
>> 
>> Well done, Matthias! Waiting for more exciting stuff to come. :)
>> 
>> Cheers,
>> Max
>> 
>> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen 
>>> wrote:
>> 
>>> Welcome!
>>> 
>>> On Wed, Sep 2, 2015 at 2:08 PM, Till Rohrmann <
>>> trohrm...@apache.org>
>>> wrote:
>>> 
 Congratulations Matthias! Welcome on board :-)
 
 On Wed, Sep 2, 2015 at 2:01 PM, Robert Metzger <
 rmetz...@apache.org>
 wrote:
 
> Congrats Matthias!
> 
> On Wed, Sep 2, 2015 at 1:34 PM, Timo Walther <
>>> twal...@apache.org
> 
>>> wrote:
> 
>> Congratulations Matthias!
>> 
>> Regards,
>> Timo
>> 
>> 
>>> On 02.09.2015 13:32, Chiwan Park wrote:
>>> 
>>> Welcome Matthias! :)
>>> 
>>> Regards,
>>> Chiwan Park
>>> 
>>> On Sep 2, 2015, at 8:30 PM, Kostas Tzoumas <
 ktzou...@apache.org
>> 
 wrote:
 
 The Project Management Committee (PMC) of Apache Flink
>> has
> asked
> Matthias
 Sax to become a committer, and we are pleased to announce
 that
> he
>>> has
 accepted.
 
 Matthias has been very active with Flink, and he is the
> original
 contributor of the Storm compatibility functionality.
 
 Being a committer enables easier contribution to the
>>> project
>> since
> there
 is no
 need to go via the pull request submission process. This
 should
 enable
 better
 productivity. Being a PMC member enables assistance with
>>> the
 management
 and
 to guide the direction of the project.
 
 Please join me in welcoming Matthias as a new committer!
>> 


Re: [Proposal] Addition to Gelly

2015-08-11 Thread Paris Carbone
Hi Andra and nice to meet you btw :)

It sounds like very fancy way to deal with skew, I like the idea even though I 
am not a graph analytics expert.
Have you ran any experiments or benchmarks to see when this preferable ? Users 
should be aware when they will get benefits by using it since node splitting 
doesn’t come with no cost I guess.
I am really eager to see how this will evolve, I think it’s good effort.

cheers
Paris


 On 11 Aug 2015, at 14:58, Andra Lungu lungu.an...@gmail.com wrote:
 
 Hi Vasia,
 
 I shall polish the functions a bit, but this is more or less what I had in
 mind:
 GSA Jaccard [what we have in Gelly right now]:
 https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSAJaccardSimilarityMeasure.java
 The same version with node split:
 https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/NodeSplittingGSAJaccard.java
 
 Yes, sure I can also share some charts with the results. I could say for
 which data sets (power law) and types of algorithms this can be used. If
 it's used appropriately, the overhead is 0.
 
 I'm open to suggestions!
 Thanks!
 Andra
 
 
 On Tue, Aug 11, 2015 at 2:45 PM, Vasiliki Kalavri vasilikikala...@gmail.com
 wrote:
 
 Hi Andra,
 
 thanks for offering to add this work to Gelly and for starting the
 discussion!
 
 How do you think this would look like from an API point of view? Is it easy
 to make it transparent to the application? Could you give us a simple
 example of what you have in mind?
 
 Apart from usability, we should also consider documenting when to use this
 feature, i.e. for which algorithms, what kind degree distribution and what
 overhead to expect (if any).
 
 Cheers,
 Vasia.
 On Aug 10, 2015 10:47 AM, Andra Lungu lungu.an...@gmail.com wrote:
 
 Hey,
 
 Before actually opening a PR, I wanted to hear your opinion. So, here
 goes
 nothing :).
 
 I'd like to add the core of my master thesis to Gelly. That is, a series
 of
 operators that take a skewed graph, split its high degree vertices into
 subvertices and redistribute the edges accordingly (thus producing the
 result faster due to the increased parallelism); then merge the
 subvertices
 into the initial vertex.
 
 Here's part of my -unrevised- abstract:
 The Twitter follower graph, the citation network and general purpose
 social networks follow a power law degree distribution. These highly
 skewed
 graphs raise challenges to current computation models which uniformly
 process vertices, leading to communication overhead or large execution
 time
 stalls.
 
 In spite of efforts made to scale up computation on natural graphs
 (PowerGraph ’s Gather - Apply - Scatter model), many performance problems
 raise from a subset of vertices that have high degrees. In this thesis,
 we
 outline the main processing issues that arise in current graph systems.
 We
 then propose a novel node splitting technique meant to differentiate the
 computation as well as partitioning strategies for low and high degree
 nodes.
 
 The “Node Splitting” abstraction is implemented as a separate set of
 operators that can be seamlessly combined with current degree-oblivious
 models such as Pregel and PowerGraph, but also with degree-aware engines
 such as Pregel+ and PowerLyra. Our solution introduces minimal overhead
 in
 the absence of skew and improves the naive implementation of a subset of
 computational intensive algorithms by a factor of two.
 
 These results have been proven on a theoretical and practical level on
 both
 real world and synthetic graphs.
 
 What I would add:
 1) the operators per se in a separate package
 2) a slightly modified example (e.g. take Jaccard Similarity and apply
 these operators on it)
 3). tests
 4). a separate section in the gelly docs that explains how to modify your
 code to use this solution.
 
 Needless to say I will maintain the code and, if, as we get more users,
 they deem this sub-API useless, we can always deprecate and delete it :).
 
 So, what do you think?
 Andra
 
 



Re: Design documents for consolidated DataStream API

2015-07-13 Thread Paris Carbone
+1 
No further concerns from my side either

 On 13 Jul 2015, at 18:30, Gyula Fóra gyula.f...@gmail.com wrote:
 
 +1
 On Mon, Jul 13, 2015 at 6:23 PM Stephan Ewen se...@apache.org wrote:
 
 If naming is the only concern, then we should go ahead, because we can
 change names easily (before the release).
 
 In fact, I don't think it leaves a bad impression. Global windows are
 non-parallel windows. There are also parallel windows. Pick what you need
 and what works.
 
 
 On Mon, Jul 13, 2015 at 6:13 PM, Gyula Fóra gyula.f...@gmail.com wrote:
 
 I think we agree on everything its more of a naming issue :)
 
 I thought it might be misleading that global time windows are
 non-parallel windows. We dont want to give a bad impression. (Also we
 dont want them to think that every global window is parallel but thats
 not
 a problem here)
 
 Gyula
 On Mon, Jul 13, 2015 at 5:22 PM Stephan Ewen se...@apache.org wrote:
 
 Okay, what is missing about the windowing in your opinion?
 
 The core points of the document are:
 
  - The parallel windows are per group only.
 
  - The implementation of the parallel windows holds window data in the
 group buffers.
 
  - The global windows are non-parallel. May have parallel
 pre-aggregation,
 if they are time windows.
 
  - Time may be operator time (timer thread), or watermark time.
 Watermark
 time can refer to ingress or event time.
 
  - Windows that do not pre-aggregate may require elements in order.
 Not
 part of the first prototype.
 
 Do we agree on those points?
 
 
 On Mon, Jul 13, 2015 at 4:50 PM, Gyula Fóra gyula.f...@gmail.com
 wrote:
 
 In general I like it, although the main difference between the
 current
 and
 the new one is the windowing and that is still not very clear.
 
 Where do we have the full stream time windows for instance?(which is
 parallel but not keyed)
 On Mon, Jul 13, 2015 at 4:28 PM Aljoscha Krettek 
 aljos...@apache.org
 wrote:
 
 +1 I like it as well.
 
 On Mon, 13 Jul 2015 at 16:17 Kostas Tzoumas ktzou...@apache.org
 wrote:
 
 +1 from my side
 
 On Mon, Jul 13, 2015 at 4:15 PM, Stephan Ewen se...@apache.org
 wrote:
 
 Do we have consensus on these designs?
 
 If we have, we should get to implementing this soon, because
 basically
 all
 streaming patches will have to be revisited in light of this...
 
 On Tue, Jul 7, 2015 at 3:41 PM, Gyula Fóra 
 gyula.f...@gmail.com
 
 wrote:
 
 You are right thats an important issue.
 
 And I think we should also do some renaming with the
 iterations
 because
 they are not really iterations like in the batch case and it
 might
 confuse
 some users.
 Maybe we can call them loops or cycles and rename the api
 calls
 to
 make
 it
 more intuitive what happens. It is really just a cyclic
 dataflow.
 
 Aljoscha Krettek aljos...@apache.org ezt írta (időpont:
 2015.
 júl.
 7.,
 K,
 15:35):
 
 Hi,
 I just noticed that we don't have anything about how
 iterations
 and
 timestamps/watermarks should interact.
 
 Cheers,
 Aljoscha
 
 On Mon, 6 Jul 2015 at 23:56 Stephan Ewen se...@apache.org
 
 wrote:
 
 Hi all!
 
 As many of you know, there are a ongoing efforts to
 consolidate
 the
 streaming API for the next release, and then graduate it
 (from
 beta
 status).
 
 In the process of this consolidation, we want to achieve
 the
 following
 goals.
 
 - Make the code more robust and simplify it in parts
 
 - Clearly define the semantics of the constructs.
 
 - Prepare it for support of more advanced concepts, like
 partitionable
 state, and event time.
 
 - Cut support for certain corner cases that were
 prototyped,
 but
 turned
 out to be not efficiently doable
 
 
 Based on prior discussions on the mailing list, Aljoscha
 and
 me
 drafted
 the
 design documents below, which outline how the
 consolidated
 API
 would
 like.
 We focused in constructs, time, and window semantics.
 
 
 Design document on how to restructure the Streaming API:
 
 
 
 
 
 
 
 
 
 
 https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
 
 Design document on definitions of time, order, and the
 resulting
 semantics:
 
 
 
 
 
 
 
 
 
 https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
 
 
 
 Note: The design of the interfaces and concepts for
 advanced
 state
 in
 functions is not in here. That is part of a separate
 design
 discussion
 and
 orthogonal to the designs drafted here.
 
 
 Please have a look and voice questions and concerns.
 Since
 we
 should
 not
 break the streaming API more than once, we should make
 sure
 this
 consolidation brings it into the shape we want it to be
 in.
 
 
 Greetings,
 Stephan
 
 
 
 
 
 
 
 
 
 



Re: Rework of streaming iteration API

2015-07-07 Thread Paris Carbone
Good points. If we want to structured loops on streaming we will need to inject 
iteration counters. The question is if we really need structured iterations on 
plain data streams. Window iterations are must-have on the other hand...

Paris

 On 07 Jul 2015, at 16:43, Kostas Tzoumas ktzou...@apache.org wrote:
 
 I see. Perhaps more important IMO is defining the semantics of stream loops
 with event time.
 
 The reason I asked about nested is that Naiad and other designs used a
 multidimensional timestamp to capture loops: (outer loop counter, inner
 loop counter, timestamp). I assume that currently making sense of which
 iteration an element comes from is left to the user. Should we aim to
 change that with the API redesign?
 
 
 On Tue, Jul 7, 2015 at 4:30 PM, Gyula Fóra gyula.f...@gmail.com wrote:
 
 Okay, I am fine with this approach as well I see the advantages. Then we
 just need to find a suitable name for marking a FeedbackPoint :)
 
 Stephan Ewen se...@apache.org ezt írta (időpont: 2015. júl. 7., K,
 16:28):
 
 In Aljoscha's approach, we would need a special mutable stream. We could
 do
 it like this:
 
 DataStream source = ...
 
 FeedbackPoint pt = source.createFeedbackPoint();
 
 DataStream mapper = pt .map(noOpMapper)
 DataStream feedback = mapper.filter(...)
 pt .addFeedbacl(feedback)
 
 
 It is basically like the current approach, with different names.
 
 I actually like the current approach, because it is explicit where
 streams
 could be altered in hind-sight (after their definition).
 
 
 On Tue, Jul 7, 2015 at 4:20 PM, Gyula Fóra gyula.f...@gmail.com wrote:
 
 @Aljoscha:
 Yes, thats basically my point as well. This is what happens now too but
 we
 give this mutable datastream a special name : IterativeDataStream
 
 This can be handled in very different ways through the api, the goal
 would
 be to make something easy to use. I am fine with what we have now
 because I
 know how it works but it might confuse people to call it iterate.
 
 Aljoscha Krettek aljos...@apache.org ezt írta (időpont: 2015. júl.
 7.,
 K,
 16:18):
 
 I think it could work if we allowed a DataStream to be unioned after
 creation. For example:
 
 DataStream source = ..
 DataStream mapper = source.map(noOpMapper)
 DataStream feedback = mapper.filter(...)
 source.union(feedback)
 
 This would basically mean that a DataStream is mutable and can be
 extended
 after creation with more streams.
 
 On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek aljos...@apache.org
 wrote:
 
 I think this would be good yes. I was just about to open an Issue
 for
 changing the Streaming Iteration API. :D
 
 Then we should also make the implementation very straightforward
 and
 simple, right now, the implementation of the iterations is all over
 the
 place.
 
 On Tue, 7 Jul 2015 at 15:57 Gyula Fóra gyf...@apache.org wrote:
 
 Hey,
 
 Along with the suggested changes to the streaming API structure I
 think
 we
 should also rework the iteration api. Currently the iteration
 api
 tries
 to mimic the syntax of the batch API while the runtime behaviour
 is
 quite
 different.
 
 What we create instead of iterations is really just cyclic streams
 (loops
 in the streaming job), so the API should somehow be intuitive
 about
 this
 behaviour.
 
 I suggest to remove the explicit iterate call and instead add a
 method
 to
 the StreamOperators that allows to connect feedback inputs (create
 loops).
 It would look like this:
 
 A mapper that does nothing but iterates over some filtered input:
 
 *Current API :*
 DataStream source = ..
 IterativeDataStream it = source.iterate()
 DataStream mapper = it.map(noOpMapper)
 DataStream feedback = mapper.filter(...)
 it.closeWith(feedback)
 
 *Suggested API :*
 DataStream source = ..
 DataStream mapper = source.map(noOpMapper)
 DataStream feedback = mapper.filter(...)
 mapper.addInput(feedback)
 
 The suggested approach would let us define inputs to operators
 after
 they
 are created and implicitly union them with the normal input. This
 is I
 think a much clearer approach than what we have now.
 
 What do you think?
 
 Gyula
 
 
 
 
 
 



Re: Replacing Checkpointed interface with field annotations

2015-07-01 Thread Paris Carbone
+1 on offering both. 

This way we cover both simplicity and expressivity when needed. Annotations 
give a very clean and simple way for marking state imho.
Perhaps most casual users will find it much better to just tag the fields that 
they want to persist.

Paris

 On 01 Jul 2015, at 15:55, Stephan Ewen se...@apache.org wrote:
 
 Actually, this is the first rework of the state interface. There is only
 one released version, yet.
 What we are doing here is trying to make sure that this first rework will
 most likely also be the last for the foreseeable future.
 
 
 From the use cases I can think of, we need at least two different state
 checkpointing methods:
 
 
 1) The variant where state is abstracted as a key/value interface. This is
 the new partitionable state interface.
Backuped is exactly what you put into the state. No need to make the
 operator aware about when checkpoint happen.
Most simple applications should be able to work against this interface.
 
 
 2) The variant where the user code gets a call onCheckpoint() (currently
 snapshotState()) and returns whatever it wants to be persisted. This is
 important if the streaming
flow interacts with outside systems and wants to groupCommit data on
 checkpoints.
 
The crucial thing here is that the value-to-be persisted by Flink may
 be in some cases not the actual data - that one has been periodically
 inserted into the external system.
The checkpointed value is only a key, epoch counter, or transaction ID
 that allows you mark what has been inserted into the external system as
 part of that checkpoint.
 
This call to onCheckpoint() is not best-effort, but crucial and needs
 to succeed if a checkpoint is to be successful. Best effort is only
 notifyCompleteCheckpoint().
And we could make this message at-least-once, if that is needed for
 reliable interaction with the outside world.
 
In the last Flink meetup in the Bay Area, we had quite a discussion
 with some people about how interface (2) is powerful when trying to get
 exactly-once with external systems.
 
Also, with this interface, it is quite straightforward to make
 asynchronous snapshotting possible, and it can be extended to incremental
 snapshotting. It is not obvious to me
how the same should work on the annotation variant.
 
 
 Concerning the annotated state:
 
 That is eye candy and nice. Would it hurt to have this and promote it as a
 shortcut to a state backup implementation using (2), where the
 shapshotState method would simply
 return the value of some fields?
 
 I know we should not offer too many different ways of doing things, but if
 we promote (2) as 2-general (interface) and 2-shortcut (annotation), I
 see no problem.
 
 
 Greetings,
 Stephan
 
 
 
 
 On Wed, Jul 1, 2015 at 11:59 AM, Robert Metzger rmetz...@apache.org wrote:
 
 I agree, if we want to change the interface, now is the best time.
 
 So you are suggesting to change the methods in the Checkpointed interface
 from
 
 T snapshotState(long checkpointId, long checkpointTimestamp) throws
 Exception;
 
 void restoreState(T state);
 
 to
 
 void onSnapshot(id, ts)
 void onRestore(id, ts)
 (+ user has to annotate checkpointed fields)
 
 I would say that the current interface is more powerful than what you
 are proposing (arguments will follow)
 I don't think that there is an advantage in usability for the user
 with the new methods (but that is a matter of taste ... )
 
 I think that the current interface is more powerful because it allows
 you to give the system a derived state to back up, instead of just the
 value of a variable. You would need to always update the derived state
 so that the system can back it up when it needs to.
 With the method, you can do this set only on demand.
 For the restore method, with the old interface, you can do sanity
 checks on the state to restore (something the only user of these
 interfaces (the kafka source) is actually doing). With your proposed
 interface, I would need to validate data from a field.
 The proposed restore method would also make it harder to restore from
 a derived state.
 
 
 On Wed, Jul 1, 2015 at 11:38 AM, Gyula Fóra gyula.f...@gmail.com wrote:
 
 I understand your concerns Robert but I don't fully agree.
 
 The Checkpointed interface works indeed but there are so many use cases
 that it is not suitable for in the long run, and also the whole interface
 is slightly awkward in my opinion when returning simple fields which are
 already serializable.
 
 This motivated the introduction of the OperatorStateInterface which you
 can
 call the first rework of the checkpointed interface, but I see that as
 the
 first version which is actually capable of handling many issues that were
 obvious with the Checkpointed interfaces.
 
 This is actually not only a rework of the interface but the rework of the
 state concept and runtime handling. This needs to be clean if we are
 moving
 streaming out of beta, and should provide the needed 

Re: Stream iteration head as ConnectedDataStream

2015-06-26 Thread Paris Carbone
That’s convenient, at least for the incremental ML where feedback streams are 
the norm.
In this case we don’t force the user to create wrappers and we also know what 
comes from where.

I went through the PR and it looks that doesn’t break anything so you have my 
+1.

Paris


 On 26 Jun 2015, at 13:35, Gyula Fóra gyf...@apache.org wrote:
 
 Hey!
 
 Now that we are implementing more and more applications for streaming that
 use iterations we realized a huge shortcoming of the current iteration api.
 Currently it only allows to feedback data of the same type to the iteration
 head.
 
 This makes sense because the operators are typed but makes it awkward if we
 indeed want to use a different feedback (such as a model syncing for
 machine learning applications). To do this developers need to use wrapper
 types and flags to distinguish the inputs.
 
 I propose to add the possibility to tread the original input of the
 iteration head operator and the feedback stream as a ConnectedDataStream so
 we can apply operators such as CoMap, CoFlatMap etc. This helps
 distinguishing the inputs and also allows different feedback types to be
 used. I believe this change is inevitable if we want to write elegant
 applications without unnecessary wrapper types.
 
 I made a PR https://github.com/apache/flink/pull/870 already that
 introduces this functionality (it is a very small change in fact).
 
 Cheers,
 Gyula



Re: Thoughts About Streaming

2015-06-25 Thread Paris Carbone
+1 for writing this down

 On 25 Jun 2015, at 18:11, Aljoscha Krettek aljos...@apache.org wrote:
 
 +1 go ahead
 
 On Thu, 25 Jun 2015 at 18:02 Stephan Ewen se...@apache.org wrote:
 
 Hey!
 
 This thread covers many different topics. Lets break this up into separate
 discussions.
 
 - Operator State is already driven by Gyula and Paris and happening on the
 above mentioned pull request and the followup discussions.
 
 - For windowing, this discussion has brought some results that we should
 sum up and clearly write down.
   I would like to chime in to do that based on what I learned from the
 document and this discussion. I also got some input from Marton about what
 he learned from mapping the Cloud DataFlow constructs to Flink.
   I'll draft a Wiki page (with the help of Aljoscha, Marton) that sums
 this up and documents it for users (if we decide to adopt this).
   Then we run this by Gyula, Matthias Sax and Kostas for feedback.
 
 - API style discussions should be yet another thread. This will probably
 be opened as people start to address that.
 
 
 I'll try to get a draft of the wiki version out tomorrow noon and send the
 link around.
 
 Greetings,
 Stephan
 
 
 
 On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax 
 mj...@informatik.hu-berlin.de wrote:
 
 Sure. I picked this up. Using the current model for occurrence time
 semantics does not work.
 
 I elaborated on this in the past many times (but nobody cared). It is
 important to make it clear to the user what semantics are supported.
 Claiming to support sliding windows doesn't mean anything; there are
 too many different semantics out there. :)
 
 
 On 06/25/2015 03:35 PM, Aljoscha Krettek wrote:
 Yes, I am aware of this requirement and it would also be supported in
 my
 proposed model.
 
 The problem is, that the custom timestamp feature gives the
 impression
 that the elements would be windowed according to a user-timestamp. The
 results, however, are wrong because of the assumption about elements
 arriving in order. (This is what I was trying to show with my fancy
 ASCII
 art and result output.
 
 On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax 
 mj...@informatik.hu-berlin.de
 wrote:
 
 Hi Aljoscha,
 
 I like that you are pushing in this direction. However, IMHO you
 misinterpreter the current approach. It does not assume that tuples
 arrive in-order; the current approach has no notion about a
 pre-defined-order (for example, the order in which the event are
 created). There is only the notion of arrival-order at the operator.
 From this arrival-order perspective, the result are correct(!).
 
 Windowing in the current approach means for example, sum up an
 attribute of all events you *received* in the last 5 seconds. That
 is a
 different meaning that sum up an attribute of all event that
 *occurred*
 in the last 5 seconds. Both queries are valid and Flink should
 support
 both IMHO.
 
 
 -Matthias
 
 
 
 On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:
 Yes, now this also processes about 3 mio Elements (Window Size 5 sec,
 Slide
 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio.
 
 Performance is not my main concern, however. My concern is that the
 current
 model assumes elements to arrive in order, which is simply not true.
 
 In your code you have these lines for specifying the window:
 .window(Time.of(1l, TimeUnit.SECONDS))
 .every(Time.of(1l, TimeUnit.SECONDS))
 
 Although this semantically specifies a tumbling window of size 1 sec
 I'm
 afraid it uses the sliding window logic internally (because of the
 .every()).
 
 In my tests I only have the first line.
 
 
 On Thu, 25 Jun 2015 at 14:32 Gábor Gévay gga...@gmail.com wrote:
 
 I'm very sorry, I had a bug in the InversePreReducer. It should be
 fixed now. Can you please run it again?
 
 I also tried to reproduce some of your performance numbers, but I'm
 getting only less than 1/10th of yours. For example, in the Tumbling
 case, Current/Reduce produces only ~10 for me. Do you have any
 idea what I could be doing wrong? My code:
 http://pastebin.com/zbEjmGhk
 I am running it on a 2 GHz Core i7.
 
 Best regards,
 Gabor
 
 
 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek aljos...@apache.org:
 Hi,
 I also ran the tests on top of PR 856 (inverse reducer) now. The
 results
 seem incorrect. When I insert a Thread.sleep(1) in the tuple
 source,
 all
 the previous tests reported around 3600 tuples (Size 5 sec, Slide 1
 sec)
 (Theoretically there would be 5000 tuples in 5 seconds but this is
 due
 to
 overhead). These are the results for the inverse reduce
 optimisation:
 (Tuple 0,38)
 (Tuple 0,829)
 (Tuple 0,1625)
 (Tuple 0,2424)
 (Tuple 0,3190)
 (Tuple 0,3198)
 (Tuple 0,-339368)
 (Tuple 0,-1315725)
 (Tuple 0,-2932932)
 (Tuple 0,-5082735)
 (Tuple 0,-7743256)
 (Tuple 0,75701046)
 (Tuple 0,642829470)
 (Tuple 0,2242018381)
 (Tuple 0,5190708618)
 (Tuple 0,10060360311)
 (Tuple 0,-94254951)
 (Tuple 0,-219806321293)
 (Tuple 0,-1258895232699)
 (Tuple 0,-4074432596329)
 
 One line is one 

RE: Force enabling checkpoints for iterative streaming jobs

2015-06-10 Thread Paris Carbone

To continue Gyula's point, for consistent snapshots we need to persist the 
records in transit within the loop  and also slightly change the current 
protocol since it works only for DAGs. Before going into that direction though 
I would propose we first see whether there is a nice way to make iterations 
more structured.

Paris

From: Gyula Fóra gyula.f...@gmail.com
Sent: Wednesday, June 10, 2015 10:19 AM
To: dev@flink.apache.org
Subject: Re: Force enabling checkpoints for iterative streaming jobs

I disagree. Not having checkpointed operators inside the iteration still
breaks the guarantees.

It is not about the states it is about the loop itself.
On Wed, Jun 10, 2015 at 10:12 AM Aljoscha Krettek aljos...@apache.org
wrote:

 This is the answer I gave on the PR (we should have one place for
 discussing this, though):

 I would be against merging this in the current form. What I propose is
 to analyse the topology to verify that there are no checkpointed
 operators inside iterations. Operators before and after iterations can
 be checkpointed and we can safely allow the user to enable
 checkpointing.

 If we have the code to analyse which operators are inside iterations
 we could also disallow windows inside iterations. I think windows
 inside iterations don't make sense since elements in different
 iterations would end up in the same window. Maybe I'm wrong here
 though, then please correct me.

 On Wed, Jun 10, 2015 at 10:08 AM, Márton Balassi
 balassi.mar...@gmail.com wrote:
  I agree that for the sake of the above mentioned use cases it is
 reasonable
  to add this to the release with the right documentation, for machine
  learning potentially loosing one round of feedback data should not
 matter.
 
  Let us not block prominent users until the next release on this.
 
  On Wed, Jun 10, 2015 at 8:09 AM, Gyula Fóra gyula.f...@gmail.com
 wrote:
 
  As for people currently suffering from it:
 
  An application King is developing requires iterations, and they need
  checkpoints. Practically all SAMOA programs would need this.
 
  It is very likely that the state interfaces will be changed after the
  release, so this is not something that we can just add later. I don't
 see a
  reason why we should not add it, as it is clearly documented. In this
  actual case not having guarantees at all means people will never use it
 in
  any production system. Having limited guarantees means that it will
 depend
  on the application.
 
  On Wed, Jun 10, 2015 at 12:53 AM, Ufuk Celebi u...@apache.org wrote:
 
   Hey Gyula,
  
   I understand your reasoning, but I don't think its worth to rush this
  into
   the release.
  
   As you've said, we cannot give precise guarantees. But this is
 arguably
   one of the key requirements for any fault tolerance mechanism.
 Therefore
  I
   disagree that this is better than not having anything at all. I think
 it
   will already go a long way to have the non-iterative case working
  reliably.
  
   And as far as I know there are no users really suffering from this at
 the
   moment (in the sense that someone has complained on the mailing list).
  
   Hence, I vote to postpone this.
  
   – Ufuk
  
   On 10 Jun 2015, at 00:19, Gyula Fóra gyf...@apache.org wrote:
  
Hey all,
   
It is currently impossible to enable state checkpointing for
 iterative
jobs, because en exception is thrown when creating the jobgraph.
 This
behaviour is motivated by the lack of precise guarantees that we can
  give
with the current fault-tolerance implementations for cyclic graphs.
   
This PR https://github.com/apache/flink/pull/812 adds an optional
   flag to
force checkpoints even in case of iterations. The algorithm will
 take
checkpoints periodically as before, but records in transit inside
 the
   loop
will be lost.
   
However even this guarantee is enough for most applications (Machine
Learning for instance) and certainly much better than not having
  anything
at all.
   
   
I suggest we add this to the 0.9 release as currently many
 applications
suffer from this limitation (SAMOA, ML pipelines, graph streaming
 etc.)
   
   
Cheers,
   
Gyula
  
  
 



Re: [DISCUSS] Re-add record copy to chained operator calls

2015-05-20 Thread Paris Carbone
I guess it was not intended ^^.

Chaining should be transparent and not break the correct/expected behaviour.


Paris?

On 20 May 2015, at 11:02, Márton Balassi mbala...@apache.org wrote:

+1 for copying.
On May 20, 2015 10:50 AM, Gyula Fóra gyf...@apache.org wrote:

Hey,

The latest streaming operator rework removed the copying of the outputs
before passing them to chained operators. This is a major break for the
previous operator semantics which guaranteed immutability.

I think this change leads to very indeterministic program behaviour from
the user's perspective as only non-chained outputs/inputs will be mutable.
If we allow this to happen, users will start disabling chaining to get
immutability which defeats the purpose. (chaining should not affect program
behaviour just increase performance)

In my opinion the default setting for each operator should be immutability
and the user could override this manually if he/she wants.

What do you think?

Regards,
Gyula




RE: [DISCUSS] Re-add record copy to chained operator calls

2015-05-20 Thread Paris Carbone
@stephan I see your point. If we assume that operators do not hold references 
in their state to any transmitted records it works fine. We therefore need to 
make this clear to the users. I need to check if that would break semantics in 
SAMOA or other integrations as well that assume immutability. For example in 
SAMOA there are often local metric objects that are being constantly mutated 
and simply forwarded periodically to other (possibly chained) operators that 
need to evaluate them. 


From: Gyula Fóra gyf...@apache.org
Sent: Wednesday, May 20, 2015 2:06 PM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Re-add record copy to chained operator calls

Copy before putting it into a window buffer and any other group buffer.

Exactly my point. Any stateful operator should be able to implement
something like this without having to worry about copying the object (and
at this point the user would need to know whether it comes from the network
to avoid unnecessary copies), so I don't agree with leaving the copy off.

The user can of course specify that the operator is mutable if he wants
(and he is worried about the performance), But I still think the default
behaviour should be immutable.
We cannot force users to not hold object references and also it is a quite
unnatural way of programming in a language like java.


On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen se...@apache.org wrote:

 I am curious why the copying is actually needed.

 In the batch API, we chain and do not copy and it is rather predictable.

 The cornerpoints of that design is to follow these rules:

  1) Objects read from the network or any buffer are always new objects.
 That comes naturally when they are deserialized as part of that (all
 buffers store serialized)

  2) After a function returned a record (or gives one to the collector), it
 if given to the chain of chained operators, but after it is through the
 chain, no one else holds a reference to that object.
  For that, it is crucial that objects are not stored by reference, but
 either stored serialized, or a copy is stored.

 This is quite solid in the batch API. How about we follow the same paradigm
 in the streaming API. We would need to adjust the following:

 1) Do not copy between operators (I think this is the case right now)

 2) Copy before putting it into a window buffer and any other group buffer.








 On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek aljos...@apache.org
 wrote:

  Yes, in fact I anticipated this. There is one central place where we
  can insert a copy step, in OperatorCollector in OutputHandler.
 
  On Wed, May 20, 2015 at 11:17 AM, Paris Carbone par...@kth.se wrote:
   I guess it was not intended ^^.
  
   Chaining should be transparent and not break the correct/expected
  behaviour.
  
  
   Paris?
  
   On 20 May 2015, at 11:02, Márton Balassi mbala...@apache.org wrote:
  
   +1 for copying.
   On May 20, 2015 10:50 AM, Gyula Fóra gyf...@apache.org wrote:
  
   Hey,
  
   The latest streaming operator rework removed the copying of the outputs
   before passing them to chained operators. This is a major break for the
   previous operator semantics which guaranteed immutability.
  
   I think this change leads to very indeterministic program behaviour
 from
   the user's perspective as only non-chained outputs/inputs will be
  mutable.
   If we allow this to happen, users will start disabling chaining to get
   immutability which defeats the purpose. (chaining should not affect
  program
   behaviour just increase performance)
  
   In my opinion the default setting for each operator should be
  immutability
   and the user could override this manually if he/she wants.
  
   What do you think?
  
   Regards,
   Gyula
  
  
 



Re: Gelly and ML for Streaming

2015-05-12 Thread Paris Carbone
Hi again Suminda! 

The stream ML api will be along the lines of the batch ML library and will have 
some interesting features.  We consider re-using as much as possible from the 
batch ML (e.g. the same data structures and general abstractions etc.). Faye 
and Martha (CCed) are looking into stream learners (eg. vertical hoeffding 
trees, concept drift detectors) and sampling techniques for their theses and 
part of their work will be most probably integrated into this library which we 
will be able to share soon.  

Furthermore, we are also looking, at an experimental level for now, into 
different ways of making online learners interact with offline ones in a single 
pipeline. If you have any ideas that you would want us to consider/look into at 
this early stage it would be great!

cheers
Paris

 On 12 May 2015, at 10:17, Vasiliki Kalavri va...@apache.org wrote:
 
 Hi Suminda,
 
 indeed this is a very exciting idea and we have been working on both Gelly
 streaming and ML streaming for a while here in Stockholm.
 
 Daniel has been looking into graph streaming for his thesis together with
 Paris and myself.
 We have evaluated existing streaming models and algorithms and we are
 working on designing a nice graph streaming API for Flink. This is a quite
 challenging area with a lot of open research issues and we hope we will be
 able to share some nice results soon!
 We already have some implementations of distributed graph streaming
 algorithms, which we are now looking into optimizing further.
 
 Paris can tell you more on the ML streaming work.
 
 By the way, do you have some specific use-case in mind or problem that you
 need graph/ML streaming for? If you would like to shre it, maybe we can
 already help you :-)
 
 Cheers,
 Vasia.
 
 On 12 May 2015 at 09:19, Márton Balassi balassi.mar...@gmail.com wrote:
 
 Dear Suminda,
 
 Thanks for the suggestion.
 Some folks at Stockholm have started implementing these, but are a bit
 quite about it. Paris, Vasia, Gyula and Daniel Bali can tell you more on
 the topic just to name a few.
 
 Best,
 
 Marton
 
 On Tue, May 12, 2015 at 7:37 AM, sirinath sirinath19...@gmail.com wrote:
 
 It would be good if you would consider how Gelly and ML can be used on
 top
 of
 streaming also from the onset.
 
 This would mean that in Gelly you should be able to read Graph mutation
 operation from the stream. Initial Graph (Vertices and Edges) if any
 followed by mutation stream.
 
 
 
 --
 View this message in context:
 
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Gelly-and-ML-for-Streaming-tp5563.html
 Sent from the Apache Flink Mailing List archive. mailing list archive at
 Nabble.com.
 
 



Re: [DISCUSS] Change Streaming Operators to be Push-Only

2015-05-05 Thread Paris Carbone
I agree with Gyula on this one. Barriers should better not be exposed to the 
operator. They are system events for state management. Apart from that, 
watermark handling seems to be on a right track, I like it so far.

 On 05 May 2015, at 15:26, Aljoscha Krettek aljos...@apache.org wrote:
 
 I don't know, I just put that there because other people are working
 on the checkpointing/barrier thing. So there would need to be some
 functionality there at some point.
 
 Or maybe it is not required there and can be handled in the
 StreamTask. Others might know this better than I do right now.
 
 On Tue, May 5, 2015 at 3:24 PM, Gyula Fóra gyula.f...@gmail.com wrote:
 What would the processBarrier method do?
 
 On Tuesday, May 5, 2015, Aljoscha Krettek aljos...@apache.org wrote:
 
 I'm using the term punctuation and watermark interchangeably here
 because for practical purposes they do the same thing. I'm not sure
 what you meant with your comment about those.
 
 For the Operator interface I'm thinking about something like this:
 
 abstract class OneInputStreamOperatorIN, OUT, F extends Function  {
   public processElement(IN element);
   public processBarrier(...);
   public processPunctuation/lowWatermark(...):
 }
 
 The operator also has access to the TaskContext and ExecutionConfig
 and Serializers. The operator would emit values using an emit() method
 or the Collector interface, not sure about that yet.
 
 On Tue, May 5, 2015 at 3:12 PM, Gyula Fóra gyf...@apache.org
 javascript:; wrote:
 I think this a good idea in general. I would try to minimize the methods
 we
 include and make the ones that we keep very concrete. For instance i
 would
 not have the receive barrier method as that is handled on a totally
 different level already. And instead of punctuation I would directly add
 a
 method to work on watermarks.
 
 On Tuesday, May 5, 2015, Aljoscha Krettek aljos...@apache.org
 javascript:; wrote:
 
 What do you mean by losing iterations?
 
 For the pros and cons:
 
 Cons: I can't think of any, since most of the operators are chainable
 already and already behave like a collector.
 
 Pros:
 - Unified model for operators, chainable operators don't have to
 worry about input iterators and the collect interface.
 - Enables features that we want in the future, such as barriers and
 punctuations because they don't work with the
  simple Collector interface.
 - The while-loop is moved outside of the operators, now the Task (the
 thing that runs Operators) can control the flow of data better and
 deal with
  stuff like barriers and punctuations. If we want to keep the
 main-loop inside each operator, then they all have to manage input
 readers and inline events manually.
 
 On Tue, May 5, 2015 at 2:41 PM, Kostas Tzoumas ktzou...@apache.org
 javascript:;
 javascript:; wrote:
 Can you give us a rough idea of the pros and cons? Do we lose some
 functionality by getting rid of iterations?
 
 Kostas
 
 On Tue, May 5, 2015 at 1:37 PM, Aljoscha Krettek aljos...@apache.org
 javascript:;
 javascript:;
 wrote:
 
 Hi Folks,
 while working on introducing source-assigned timestamps into
 streaming
 (https://issues.apache.org/jira/browse/FLINK-1967) I thought about
 how
 the punctuations (low watermarks) can be pushed through the system.
 The problem is, that operators can have two ways of getting input: 1.
 They read directly from input iterators, and 2. They act as a
 Collector and get elements via collect() from the previous operator
 in
 a chain.
 
 This makes it hard to push things through a chain that are not
 elements, such as barriers and/or punctuations.
 
 I propose to change all streaming operators to be push based, with a
 slightly improved interface: In addition to collect(), which I would
 call receiveElement() I would add receivePunctuation() and
 receiveBarrier(). The first operator in the chain would also get data
 from the outside invokable that reads from the input iterator and
 calls receiveElement() for the first operator in a chain.
 
 What do you think? I would of course be willing to implement this
 myself.
 
 Cheers,
 Aljoscha
 
 
 



Re: [DISCUSS] Change Streaming Operators to be Push-Only

2015-05-05 Thread Paris Carbone
By watermark handling I meant making punctuations explicit and 
forwarding/modifying them at the operator level. I think this is clear so far.
 On 05 May 2015, at 15:41, Aljoscha Krettek aljos...@apache.org wrote:
 
 There is no watermark handling yet. :D
 
 But this would enable me to do this.
 
 On Tue, May 5, 2015 at 3:39 PM, Paris Carbone par...@kth.se wrote:
 I agree with Gyula on this one. Barriers should better not be exposed to the 
 operator. They are system events for state management. Apart from that, 
 watermark handling seems to be on a right track, I like it so far.
 
 On 05 May 2015, at 15:26, Aljoscha Krettek aljos...@apache.org wrote:
 
 I don't know, I just put that there because other people are working
 on the checkpointing/barrier thing. So there would need to be some
 functionality there at some point.
 
 Or maybe it is not required there and can be handled in the
 StreamTask. Others might know this better than I do right now.
 
 On Tue, May 5, 2015 at 3:24 PM, Gyula Fóra gyula.f...@gmail.com wrote:
 What would the processBarrier method do?
 
 On Tuesday, May 5, 2015, Aljoscha Krettek aljos...@apache.org wrote:
 
 I'm using the term punctuation and watermark interchangeably here
 because for practical purposes they do the same thing. I'm not sure
 what you meant with your comment about those.
 
 For the Operator interface I'm thinking about something like this:
 
 abstract class OneInputStreamOperatorIN, OUT, F extends Function  {
  public processElement(IN element);
  public processBarrier(...);
  public processPunctuation/lowWatermark(...):
 }
 
 The operator also has access to the TaskContext and ExecutionConfig
 and Serializers. The operator would emit values using an emit() method
 or the Collector interface, not sure about that yet.
 
 On Tue, May 5, 2015 at 3:12 PM, Gyula Fóra gyf...@apache.org
 javascript:; wrote:
 I think this a good idea in general. I would try to minimize the methods
 we
 include and make the ones that we keep very concrete. For instance i
 would
 not have the receive barrier method as that is handled on a totally
 different level already. And instead of punctuation I would directly add
 a
 method to work on watermarks.
 
 On Tuesday, May 5, 2015, Aljoscha Krettek aljos...@apache.org
 javascript:; wrote:
 
 What do you mean by losing iterations?
 
 For the pros and cons:
 
 Cons: I can't think of any, since most of the operators are chainable
 already and already behave like a collector.
 
 Pros:
 - Unified model for operators, chainable operators don't have to
 worry about input iterators and the collect interface.
 - Enables features that we want in the future, such as barriers and
 punctuations because they don't work with the
 simple Collector interface.
 - The while-loop is moved outside of the operators, now the Task (the
 thing that runs Operators) can control the flow of data better and
 deal with
 stuff like barriers and punctuations. If we want to keep the
 main-loop inside each operator, then they all have to manage input
 readers and inline events manually.
 
 On Tue, May 5, 2015 at 2:41 PM, Kostas Tzoumas ktzou...@apache.org
 javascript:;
 javascript:; wrote:
 Can you give us a rough idea of the pros and cons? Do we lose some
 functionality by getting rid of iterations?
 
 Kostas
 
 On Tue, May 5, 2015 at 1:37 PM, Aljoscha Krettek aljos...@apache.org
 javascript:;
 javascript:;
 wrote:
 
 Hi Folks,
 while working on introducing source-assigned timestamps into
 streaming
 (https://issues.apache.org/jira/browse/FLINK-1967) I thought about
 how
 the punctuations (low watermarks) can be pushed through the system.
 The problem is, that operators can have two ways of getting input: 1.
 They read directly from input iterators, and 2. They act as a
 Collector and get elements via collect() from the previous operator
 in
 a chain.
 
 This makes it hard to push things through a chain that are not
 elements, such as barriers and/or punctuations.
 
 I propose to change all streaming operators to be push based, with a
 slightly improved interface: In addition to collect(), which I would
 call receiveElement() I would add receivePunctuation() and
 receiveBarrier(). The first operator in the chain would also get data
 from the outside invokable that reads from the input iterator and
 calls receiveElement() for the first operator in a chain.
 
 What do you think? I would of course be willing to implement this
 myself.
 
 Cheers,
 Aljoscha
 
 
 
 



Akka transparency and serialisation

2015-04-21 Thread Paris Carbone
Hello everyone,


Many of you are already aware of this but it is good to make it clear in the 
mailist. We bumped into this special case with Akka several times already and 
it is important to know where transparency actually breaks.


In short, Akka serialises only messages that get transferred over the wire or 
across JVMs [1]. Thus, we should not rely on messages getting serialised for 
anything we want to transfer using Akka. To overcome this we should either:


1) Do a deep copy of everything passed via Akka messaging

2) Apply serialisation manually before sending messages and transfer only 
pre-serialized data.

3) Never rely on transient fields


cheers

Paris


[1] http://doc.akka.io/docs/akka/snapshot/general/remoting.html



Re: Rework of the window-join semantics

2015-04-07 Thread Paris Carbone
Hello Matthias,

Sure, ordering guarantees are indeed a tricky thing, I recall having that 
discussion back in TU Berlin. Bear in mind thought that DataStream, our 
abstract data type, represents a *partitioned* unbounded sequence of events. 
There are no *global* ordering guarantees made whatsoever in that model across 
partitions. If you see it more generally there are many “race conditions” in a 
distributed execution graph of vertices that process multiple inputs 
asynchronously, especially when you add joins and iterations into the mix (how 
do you deal with reprocessing “old” tuples that iterate in the graph). Btw have 
you checked the Naiad paper [1]? Stephan cited a while ago and it is quite 
relevant to that discussion.

Also, can you cite the paper with the joining semantics you are referring to? 
That would be of good help I think.

Paris

[1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf

https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf

https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
On 07 Apr 2015, at 11:50, Matthias J. Sax 
mj...@informatik.hu-berlin.demailto:mj...@informatik.hu-berlin.de wrote:

Hi @all,

please keep me in the loop for this work. I am highly interested and I
want to help on it.

My initial thoughts are as follows:

1) Currently, system timestamps are used and the suggested approach can
be seen as state-of-the-art (there is actually a research paper using
the exact same join semantic). Of course, the current approach is
inherently non-deterministic. The advantage is, that there is no
overhead in keeping track of the order of records and the latency should
be very low. (Additionally, state-recovery is simplified. Because, the
processing in inherently non-deterministic, recovery can be done with
relaxed guarantees).

 2) The user should be able to switch on deterministic processing,
ie, records are timestamped (either externally when generated, or
timestamped at the sources). Because deterministic processing adds some
overhead, the user should decide for it actively.
In this case, the order must be preserved in each re-distribution step
(merging is sufficient, if order is preserved within each incoming
channel). Furthermore, deterministic processing can be achieved by sound
window semantics (and there is a bunch of them). Even for
single-stream-windows it's a tricky problem; for join-windows it's even
harder. From my point of view, it is less important which semantics are
chosen; however, the user must be aware how it works. The most tricky
part for deterministic processing, is to deal with duplicate timestamps
(which cannot be avoided). The timestamping for (intermediate) result
tuples, is also an important question to be answered.


-Matthias


On 04/07/2015 11:37 AM, Gyula Fóra wrote:
Hey,

I agree with Kostas, if we define the exact semantics how this works, this
is not more ad-hoc than any other stateful operator with multiple inputs.
(And I don't think any other system support something similar)

We need to make some design choices that are similar to the issues we had
for windowing. We need to chose how we want to evaluate the windowing
policies (global or local) because that affects what kind of policies can
be parallel, but I can work on these things.

I think this is an amazing feature, so I wouldn't necessarily rush the
implementation for 0.9 though.

And thanks for helping writing these down.

Gyula

On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas 
ktzou...@apache.orgmailto:ktzou...@apache.org wrote:

Yes, we should write these semantics down. I volunteer to help.

I don't think that this is very ad-hoc. The semantics are basically the
following. Assuming an arriving element from the left side:
(1) We find the right-side matches
(2) We insert the left-side arrival into the left window
(3) We recompute the left window
We need to see whether right window re-computation needs to be triggered as
well. I think that this way of joining streams is also what the symmetric
hash join algorithms were meant to support.

Kostas


On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen 
se...@apache.orgmailto:se...@apache.org wrote:

Is the approach of joining an element at a time from one input against a
window on the other input not a bit arbitrary?

This just joins whatever currently happens to be the window by the time
the
single element arrives - that is a bit non-predictable, right?

As a more general point: The whole semantics of windowing and when they
are
triggered are a bit ad-hoc now. It would be really good to start
formalizing that a bit and
put it down somewhere. Users need to be able to clearly understand and
how
to predict the output.



On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra 
gyula.f...@gmail.commailto:gyula.f...@gmail.com
wrote:

I think it should be possible to make this compatible with the
.window().every() calls. Maybe if there is some trigger set in every
we
would not join that stream 1 by 1 but every so many elements. The
problem

Re: Storm compatibility layer for Flink (first beta available)

2015-04-02 Thread Paris Carbone
That’s pretty nice Matthias, we could use a compositional API in streaming that 
many people are familiar with.
I can also help in some parts, I see some issues we already encountered while 
creating the samoa adapter (eg. dealing with circles in the topology). Thanks 
again for initiating this!

Paris

 On 02 Apr 2015, at 23:14, Gyula Fóra gyf...@apache.org wrote:
 
 This sounds amazing :) thanks Matthias!
 
 Tomorrow I will spend some time to look through your work and give some
 comments.
 
 Also I would love to help with this effort so once we merge an initial
 prototype let's open some Jiras and I will pick some up :)
 
 Gyula
 
 On Thursday, April 2, 2015, Márton Balassi balassi.mar...@gmail.com wrote:
 
 Hey Mathias,
 
 Thanks, this is a really nice contribution. I just scrolled through the
 code, but I really like it and big thanks for the the tests for the
 examples.
 
 The rebase Fabian suggested would help a lot when merging.
 
 
 
 On Thu, Apr 2, 2015 at 9:19 PM, Fabian Hueske fhue...@gmail.com
 javascript:; wrote:
 
 Hi Matthias,
 
 this is really cool!I especially like that you can use Storm code within
 a
 Flink streaming program :-)
 
 One thing that might be good to do rather soon is to collect all your
 commits and put them on top of a fresh forked Flink master branch.
 When merging we cannot change the history and try to put only
 fast-forward
 commits on top of the master branch.
 As time goes on it becomes more likely that you run into merge issues
 when
 cherry-picking the commits.
 
 2015-04-02 21:09 GMT+02:00 Robert Metzger rmetz...@apache.org
 javascript:;:
 
 Hey Henry,
 
 you can check out the files here:
 
 
 
 https://github.com/mjsax/flink/tree/flink-storm-compatibility/flink-staging/flink-streaming/flink-storm-compatibility
 ... so yes, they are located in the flink-streaming directory .. which
 is a
 good place for now.
 Once we move flink-streaming out of staging, we might want to keep the
 storm compat in staging.
 
 
 On Thu, Apr 2, 2015 at 8:16 PM, Henry Saputra henry.sapu...@gmail.com
 javascript:;
 wrote:
 
 HI Matthias,
 
 Where do you put the code for the Storm compatibility? Under streams
 module directory?
 
 - Henry
 
 On Thu, Apr 2, 2015 at 10:31 AM, Matthias J. Sax
 mj...@informatik.hu-berlin.de javascript:; wrote:
 Hi @all,
 
 I started to work on an compatibility layer to run Storm Topologies
 on
 Flink. I just pushed a first beta:
 https://github.com/mjsax/flink/tree/flink-storm-compatibility
 
 Please check it out, and let me know how you like it. In this first
 version, I tried to code without changing too many things in the
 existing code. However, a deeper integration would be nice.
 
 There are many things that do not work yet. Currently, only shuffle
 and
 fieldsGrouping is supported (and only Storm's default output
 stream).
 Furthermore, topologies must be simple, ie, they cannot be
 configured
 with a Config object and Storm meta information (ie,
 TopologyContext)
 is
 also not there.
 
 The layer can be used to integrate existing Spouts and/or Bolts as
 Flink
 operators into a regular Flink program. Furthermore, a whole Storm
 topology can be executed by switching from TopologyBuilder to
 FlinkTopologyBuilder and from
 LocalCluster/NimbusClient/StormSubmitter
 to FlinkLocalCluster/FlinkClient/FlinkStormSubmitter.
 
 Examples of both cases are given as ITCases.
 
 
 Cheers,
  Matthias
 
 
 
 
 
 



[jira] [Created] (FLINK-1808) Omit sending checkpoint barriers when the execution graph is not running

2015-03-31 Thread Paris Carbone (JIRA)
Paris Carbone created FLINK-1808:


 Summary: Omit sending checkpoint barriers when the execution graph 
is not running
 Key: FLINK-1808
 URL: https://issues.apache.org/jira/browse/FLINK-1808
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Paris Carbone
Assignee: Paris Carbone


Currently the StreamCheckpointCoordinator sends barrier requests even when the 
executionGraph is in FAILING or RESTARTING status which results in unneeded 
potential communication and space overhead until the job restarts again. It 
should therefore simply omit sending barriers requests when the execution graph 
is not in a RUNNING state. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Make a release to be announced at ApacheCon

2015-03-26 Thread Paris Carbone
+1 for an early release. It will help unblock the samoa PR that has 0.9 
dependencies.

 On 26 Mar 2015, at 11:44, Kostas Tzoumas ktzou...@apache.org wrote:
 
 +1 for an early milestone release. Perhaps we can call it 0.9-milestone or
 so?
 
 On Thu, Mar 26, 2015 at 11:01 AM, Robert Metzger rmetz...@apache.org
 wrote:
 
 Two weeks have passed since we've discussed the 0.9 release the last time.
 
 The ApacheCon is in 18 days from now.
 If we want, we can also release a 0.9.0-beta release that contains known
 bugs, but allows our users to try out the new features easily (because they
 are part of a release). The vote for such a release would be mainly about
 the legal aspects of the release rather than the stability. So I suspect
 that the vote will go through much quicker.
 
 
 
 On Fri, Mar 13, 2015 at 12:01 PM, Robert Metzger rmetz...@apache.org
 wrote:
 
 I've reopened https://issues.apache.org/jira/browse/FLINK-1650 because
 the issue is still occurring.
 
 On Thu, Mar 12, 2015 at 7:05 PM, Ufuk Celebi u...@apache.org wrote:
 
 On Thursday, March 12, 2015, Till Rohrmann till.rohrm...@gmail.com
 wrote:
 
 Have you run the 20 builds with the new shading code? With new shading
 the
 TaskManagerFailsITCase should no longer fail. If it still does, then
 we
 have to look into it again.
 
 
 No, rebased on Monday before shading. Let me rebase and rerun tonight.
 
 
 
 



Re: GSoC proposal

2015-03-26 Thread Paris Carbone
Hi Gabor,

Approximate statistics is a really good topic, I think there is a lot to do if 
you focus there. One idea would also be to include some of your contributions 
to the incremental machine learning library that will be available by June. 
From there you will be able to also use sampling and stream mining primitives 
out-of-the-box among others. Regarding window optimisations, as Gyula said, 
there is not much to do simply because we are working heavily on it already. 
Good luck and thanks for the proposal! 

Paris

 On 26 Mar 2015, at 22:59, Gyula Fóra gyula.f...@gmail.com wrote:
 
 Hey Gabor,
 
 Thank you for the proposal. It has many interesting ideas and a good
 potential.
 
 My comments:
 
 We already have a large amount of ongoing work on the windowing
 optimizations, covering your suggestions in section 1. It would be better
 to drop that part from the project because thats very heavily on the
 research side and as I said we are working on this at SICS.
 
 I like the list that you made for section 2., and this should be the main
 emphasis on the project. It would indeed be very nice to have a wide range
 of statistics that we can compute (or approximate - this should be optional
 thoug) on streams and windows (maybe we should also add some practical
 stuff like top-k, distinct etc).
 
 Here is a list of interesting papers that seems to be related to this
 project
 
 https://gist.github.com/debasishg/8172796
 
 Cheers,
 Gyula
 
 On Thu, Mar 26, 2015 at 7:50 PM, Gábor Gévay gga...@gmail.com wrote:
 
 Hello,
 
 I will be applying to the Google Summer of Code, and I wrote most of
 the proposal:
 http://compalg.inf.elte.hu/~ggevay/Proposal.pdf
 I would appreciate it if you could comment on it.
 
 Gyula Fora, git blame is telling me that you wrote most of the
 relevant parts of the windowing code, so I would be especially
 interested in what you think of my improvement ideas.
 
 Best regards,
 Gabor
 



[jira] [Created] (FLINK-1686) Streaming iteration heads cannot be instantiated

2015-03-11 Thread Paris Carbone (JIRA)
Paris Carbone created FLINK-1686:


 Summary: Streaming iteration heads cannot be instantiated
 Key: FLINK-1686
 URL: https://issues.apache.org/jira/browse/FLINK-1686
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Paris Carbone
Priority: Critical


It looks that streaming jobs with iterations and dop  1 do not work currently. 
From what I see, when the TaskManager tries to instantiate a new 
RuntimeEnvironment for the iteration head tasks it fails since the following 
exception is being thrown:

java.lang.Exception: Failed to deploy the task Map (2/8) - execution #0 to slot 
SimpleSlot (0)(1) - 0e39fcabcab3e8543cc2d8320f9de783 - ALLOCATED/ALIVE: 
java.lang.Exception: Error setting up runtime environment: 
java.lang.RuntimeException: Could not register the given element, broker slot 
is already occupied.
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.init(RuntimeEnvironment.java:174)
at 
org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$submitTask(TaskManager.scala:432)
.
.
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Could not 
register the given element, broker slot is already occupied.
at 
org.apache.flink.streaming.api.streamvertex.StreamIterationHead.setInputsOutputs(StreamIterationHead.java:64)
at 
org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:86)
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.init(RuntimeEnvironment.java:171)
... 20 more
Caused by: java.lang.RuntimeException: Could not register the given element, 
broker slot is already occupied.
at 
org.apache.flink.runtime.iterative.concurrent.Broker.handIn(Broker.java:39)
at 
org.apache.flink.streaming.api.streamvertex.StreamIterationHead.setInputsOutputs(StreamIterationHead.java:62)

The IterateTest passed since it is using a dop of 1 but for higher parallelism 
it fails. Also, the IterateExample fails as well if you try to run it. 

I will debug this once I find some time so any ideas of what could possible 
cause this are more than welcome. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Iterative streaming example

2015-02-23 Thread Paris Carbone
Hello Peter,

Streaming machine learning algorithms make use of iterations quite widely. One 
simple example is implementing distributed stream learners. There, in many 
cases you need some central model aggregator, distributed estimators to offload 
the central node and of course feedback loops to merge everything back to the 
main aggregator periodically. One such example in the Vertical Hoeffding Tree 
Classifier (VFDT) [1] that is implemented in Samoa.

Iterative streams are also useful for optimisation techniques as in batch 
processing (eg. trying different parameters to estimate a variable, getting 
back the accuracy from an evaluator and repeating until a condition is 
achieved).

I hope this helps to get a general idea of where iterations can be used.

[1] https://github.com/yahoo/samoa/wiki/Vertical-Hoeffding-Tree-Classifier


On 23 Feb 2015, at 12:13, Stephan Ewen 
se...@apache.orgmailto:se...@apache.org wrote:

I think that the Samoa people have quite a few nice examples along the
lines of model training with feedback.

@Paris: What would be the simplest example?

On Mon, Feb 23, 2015 at 11:27 AM, Szabó Péter 
nemderogator...@gmail.commailto:nemderogator...@gmail.com
wrote:

Does everyone know of a good, simple and realistic streaming iteration
example? The current example tests a random generator, but it should be
replaced by something deterministic in order to be testable.

Peter




Re: [DISCUSS] Dedicated streaming mode and start scripts

2015-02-18 Thread Paris Carbone
+1

I agree it’s a proper way to go.

On 18 Feb 2015, at 10:41, Max Michels m...@apache.orgmailto:m...@apache.org 
wrote:

+1

On Tue, Feb 17, 2015 at 2:40 PM, Aljoscha Krettek 
aljos...@apache.orgmailto:aljos...@apache.org wrote:
+1

On Tue, Feb 17, 2015 at 1:34 PM, Till Rohrmann 
trohrm...@apache.orgmailto:trohrm...@apache.org wrote:
+1

On Tue, Feb 17, 2015 at 1:34 PM, Kostas Tzoumas 
ktzou...@apache.orgmailto:ktzou...@apache.org wrote:

+1

On Tue, Feb 17, 2015 at 12:14 PM, Márton Balassi 
mbala...@apache.orgmailto:mbala...@apache.org
wrote:

When it comes to the current use cases I'm for this separation.
@Ufuk: As Gyula has already pointed out with the current design of
integration it should not be a problem. Even if we submitted programs to
the wrong clusters it would only cause performance issues.

Eventually it would be nice to have an integrated cluster.

On Tue, Feb 17, 2015 at 11:55 AM, Ufuk Celebi 
u...@apache.orgmailto:u...@apache.org wrote:

I think this separation reflects the way that Flink is used currently
anyways. I would be in favor of it as well.

- What about the ongoing efforts (I think by Gyula) to combine both the
batch and stream processing APIs? I assume that this would only effect
the
performance and wouldn't pose a fundamental problem there, would it?






Re: Serialization problem in Flink integration to SAMOA

2015-01-28 Thread Paris Carbone
fyi

The problem seems to be that samoa-api uses Kryo 2.17 and Flink 2.24.0. All 
flink-related tests pass if I upgrade samoa to 2.24.0. You can also ask at the 
samoa-incubating dev-list if that will be ok to change. Maybe it would be good 
to test the same version on storm, samza and s4 respectively to be sure.

Paris


 On 28 Jan 2015, at 12:52, F. Beligianni faybeligia...@gmail.com wrote:
 
 Hello,
 
 I am currently working on the integration of Flink Streaming API to
 SAMOA and I have some problems with an exception that I take from the kryo
 serialiser:
 
 Caused by: java.lang.ArrayIndexOutOfBoundsException
 at java.lang.System.arraycopy(Native Method)
 at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:238)
 at
 org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read(SpillingAdaptiveSpanningRecordDeserializer.java:410)
 at
 org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
 at com.esotericsoftware.kryo.io.Input.fill(Input.java:134)
 at com.esotericsoftware.kryo.io.Input.require(Input.java:154)
 at com.esotericsoftware.kryo.io.Input.readInt(Input.java:303)
 at
 com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:103)
 at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:596)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:707)
 at
 org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:195)
 
 
 Specifically, I am working with Flink-0.9-SNAPSHOT and the exception is
 received on the custom class FlinkProcessingItem which extends
 StreamInvokable class, in invoke function when the readNext() function
 of StreamInvokable is called.
 
 The object that is supposed to be received by readNext function is a
 custom Tuple3 object, called SamoaType and defined like this:
 SamoaType extends Tuple3String, ContentEvent, String, where
 ContentEvent is an interface of SAMOA.
 
 The type information of the custom SamoaType is added to the source in the
 following way: TypeExtractor.getForObject
 
 The ContentEvent object that's sent between the two Invokables is of type
 InstanceContentEvent which implements ContentEvent, which you can find in
 the following link:
 InstanceContentEvent
 https://github.com/yahoo/samoa/blob/master/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java
 .
 
 We managed to reproduce the exception in the following test program;
 TestSerialization
 https://github.com/senorcarbone/samoa/commit/9eba049031aee85d1bef58dcdaf37110b9fe4505
 .
 
 
 Lastly, I should mention that the same example runs in Storm, even though
 Storm also uses kryo.
 
 Thank you,
 Fay



Re: [flink-streaming] Regarding loops in the Job Graph

2015-01-22 Thread Paris Carbone
Thanks for the quick answers!
It is possible to use iterations, we could detect circles while building the 
samoa topology and convert them into iterations. It is perhaps the proper way 
to go. I just thought whether we could hack around it but we better avoid 
messing with cyclic dependences. 

Paris

 On 21 Jan 2015, at 19:36, Stephan Ewen se...@apache.org wrote:
  
 Hi Paris!
 
 The Streaming API allows you to define iterations, where parts of the
 stream are fed back. Do those work for you?
 
 In general, cyclic flows are a tricky thing, as the topological order of
 operators is needed for scheduling (may not be important for continuous
 streams) but also for a clear producer/consumer relationship, which is
 important for fault tolerance techniques.
 
 Currently, the JobManager topologically sorts the job graph and starts
 scheduling operators. I am surprised to hear that a graph with cyclic
 dependencies works...
 
 
 Stephan
 
 
 
 
 Stephan
 
 
 On Wed, Jan 21, 2015 at 2:57 AM, Paris Carbone par...@kth.se wrote:
 
 Hello,
 
 While implementing the SAMOA adapter for Flink-Streaming we stumbled upon
 the need to allow loops (or circular dependencies) in the job graph. Many
 incremental machine learning tasks define loops already  and there is no
 trivial way of getting around it. In the streaming job graph builder there
 is only a check that does not allow the user to submit graphs with loops,
 however, from what Gyula told me, if the check is removed the streaming job
 runs as expected. Is there (still) a major reason for having this check, at
 least in the streaming component?
 
 Paris