Re: @TearDown guarantees

2018-02-18 Thread Romain Manni-Bucau
Yes 1M. Lets try to explain you simplifying the overall execution. Each
instance - one fn so likely in a thread of a worker - has its lifecycle.
Caricaturally: "new" and garbage collection.

In practise, new is often an unsafe allocate (deserialization) but it
doesnt matter here.

What i want is any "new" to have a following setup before any process or
stattbundle and the last time beam has the instance before it is gc-ed and
after last finishbundle it calls teardown.

It is as simple as it.
This way no need to comibe fn in a way making a fn not self contained to
implement basic transforms.

Le 18 févr. 2018 20:07, "Reuven Lax"  a écrit :

>
>
> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
> rmannibu...@gmail.com> wrote:
>
>>
>>
>> Le 18 févr. 2018 19:28, "Ben Chambers"  a écrit :
>>
>> It feels like his thread may be a bit off-track. Rather than focusing on
>> the semantics of the existing methods -- which have been noted to be meet
>> many existing use cases -- it would be helpful to focus on more on the
>> reason you are looking for something with different semantics.
>>
>> Some possibilities (I'm not sure which one you are trying to do):
>>
>> 1. Clean-up some external, global resource, that was initialized once
>> during the startup of the pipeline. If this is the case, how are you
>> ensuring it was really only initialized once (and not once per worker, per
>> thread, per instance, etc.)? How do you know when the pipeline should
>> release it? If the answer is "when it reaches step X", then what about a
>> streaming pipeline?
>>
>>
>> When the dofn is no more needed logically ie when the batch is done or
>> stream is stopped (manually or by a jvm shutdown)
>>
>
> I'm really not following what this means.
>
> Let's say that a pipeline is running 1000 workers, and each worker is
> running 1000 threads (each running a copy of the same DoFn). How many
> cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when do
> you want it called? When the entire pipeline is shut down? When an
> individual worker is about to shut down (which may be temporary - may be
> about to start back up)? Something else?
>
>
>
>>
>>
>>
>> 2. Finalize some resources that are used within some region of the
>> pipeline. While, the DoFn lifecycle methods are not a good fit for this
>> (they are focused on managing resources within the DoFn), you could model
>> this on how FileIO finalizes the files that it produced. For instance:
>>a) ParDo generates "resource IDs" (or some token that stores
>> information about resources)
>>b) "Require Deterministic Input" (to prevent retries from changing
>> resource IDs)
>>c) ParDo that initializes the resources
>>d) Pipeline segments that use the resources, and eventually output the
>> fact they're done
>>e) "Require Deterministic Input"
>>f) ParDo that frees the resources
>>
>> By making the use of the resource part of the data it is possible to
>> "checkpoint" which resources may be in use or have been finished by using
>> the require deterministic input. This is important to ensuring everything
>> is actually cleaned up.
>>
>>
>> I nees that but generic and not case by case to industrialize some api on
>> top of beam.
>>
>>
>>
>> 3. Some other use case that I may be missing? If it is this case, could
>> you elaborate on what you are trying to accomplish? That would help me
>> understand both the problems with existing options and possibly what could
>> be done to help.
>>
>>
>> I understand there are sorkaround for almost all cases but means each
>> transform is different in its lifecycle handling  except i dislike it a lot
>> at a scale and as a user since you cant put any unified practise on top of
>> beam, it also makes beam very hard to integrate or to use to build higher
>> level libraries or softwares.
>>
>> This is why i tried to not start the workaround discussions and just stay
>> at API level.
>>
>>
>>
>> -- Ben
>>
>>
>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau 
>> wrote:
>>
>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov :
>>>
 "Machine state" is overly low-level because many of the possible
 reasons can happen on a perfectly fine machine.
 If you'd like to rephrase it to "it will be called except in various
 situations where it's logically impossible or impractical to guarantee that
 it's called", that's fine. Or you can list some of the examples above.

>>>
>>> Sounds ok to me
>>>
>>>

 The main point for the user is, you *will* see non-preventable
 situations where it couldn't be called - it's not just intergalactic
 crashes - so if the logic is very important (e.g. cleaning up a large
 amount of temporary files, shutting down a large number of VMs you started
 etc), you have to express it using one of the other methods that have
 stricter guarantees (which obviously come at a cost, e.g. 

Re: @TearDown guarantees

2018-02-18 Thread Eugene Kirpichov
The kind of whole-transform lifecycle you're mentioning can be accomplished
using the Wait transform as I suggested in the thread above, and I believe
it should become the canonical way to do that.

(Would like to reiterate one more time, as the main author of most design
documents related to SDF and of its implementation in the Java direct and
dataflow runner that SDF is fully unrelated to the topic of cleanup - I'm
very confused as to why it keeps coming up)

On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau 
wrote:

> I kind of agree except transforms lack a lifecycle too. My understanding
> is that sdf could be a way to unify it and clean the api.
>
> Otherwise how to normalize - single api -  lifecycle of transforms?
>
> Le 18 févr. 2018 21:32, "Ben Chambers"  a écrit :
>
>> Are you sure that focusing on the cleanup of specific DoFn's is
>> appropriate? Many cases where cleanup is necessary, it is around an entire
>> composite PTransform. I think there have been discussions/proposals around
>> a more methodical "cleanup" option, but those haven't been implemented, to
>> the best of my knowledge.
>>
>> For instance, consider the steps of a FileIO:
>> 1. Write to a bunch (N shards) of temporary files
>> 2. When all temporary files are complete, attempt to do a bulk copy to
>> put them in the final destination.
>> 3. Cleanup all the temporary files.
>>
>> (This is often desirable because it minimizes the chance of seeing
>> partial/incomplete results in the final destination).
>>
>> In the above, you'd want step 1 to execute on many workers, likely using
>> a ParDo (say N different workers).
>> The move step should only happen once, so on one worker. This means it
>> will be a different DoFn, likely with some stuff done to ensure it runs on
>> one worker.
>>
>> In such a case, cleanup / @TearDown of the DoFn is not enough. We need an
>> API for a PTransform to schedule some cleanup work for when the transform
>> is "done". In batch this is relatively straightforward, but doesn't exist.
>> This is the source of some problems, such as BigQuery sink leaving files
>> around that have failed to import into BigQuery.
>>
>> In streaming this is less straightforward -- do you want to wait until
>> the end of the pipeline? Or do you want to wait until the end of the
>> window? In practice, you just want to wait until you know nobody will need
>> the resource anymore.
>>
>> This led to some discussions around a "cleanup" API, where you could have
>> a transform that output resource objects. Each resource object would have
>> logic for cleaning it up. And there would be something that indicated what
>> parts of the pipeline needed that resource, and what kind of temporal
>> lifetime those objects had. As soon as that part of the pipeline had
>> advanced far enough that it would no longer need the resources, they would
>> get cleaned up. This can be done at pipeline shutdown, or incrementally
>> during a streaming pipeline, etc.
>>
>> Would something like this be a better fit for your use case? If not, why
>> is handling teardown within a single DoFn sufficient?
>>
>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>> Yes 1M. Lets try to explain you simplifying the overall execution. Each
>>> instance - one fn so likely in a thread of a worker - has its lifecycle.
>>> Caricaturally: "new" and garbage collection.
>>>
>>> In practise, new is often an unsafe allocate (deserialization) but it
>>> doesnt matter here.
>>>
>>> What i want is any "new" to have a following setup before any process or
>>> stattbundle and the last time beam has the instance before it is gc-ed and
>>> after last finishbundle it calls teardown.
>>>
>>> It is as simple as it.
>>> This way no need to comibe fn in a way making a fn not self contained to
>>> implement basic transforms.
>>>
>>> Le 18 févr. 2018 20:07, "Reuven Lax"  a écrit :
>>>


 On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
 rmannibu...@gmail.com> wrote:

>
>
> Le 18 févr. 2018 19:28, "Ben Chambers"  a
> écrit :
>
> It feels like his thread may be a bit off-track. Rather than focusing
> on the semantics of the existing methods -- which have been noted to be
> meet many existing use cases -- it would be helpful to focus on more on 
> the
> reason you are looking for something with different semantics.
>
> Some possibilities (I'm not sure which one you are trying to do):
>
> 1. Clean-up some external, global resource, that was initialized once
> during the startup of the pipeline. If this is the case, how are you
> ensuring it was really only initialized once (and not once per worker, per
> thread, per instance, etc.)? How do you know when the pipeline should
> release it? If the answer is "when it reaches step X", then what about a
> streaming pipeline?

Re: Euphoria Java 8 DSL - proposal

2018-02-18 Thread Jean-Baptiste Onofré
Hi Davor,

We still have some discussion/paperwork on Euphoria side (SGA, ...).

So, it's on track but it takes a little more time than expected.

Regards
JB

On 02/19/2018 05:40 AM, Davor Bonaci wrote:
> I may have missed things, but any update on the progress of this donation?
> 
> On Tue, Jan 2, 2018 at 10:52 PM, Jean-Baptiste Onofré  > wrote:
> 
> Great !
> 
> Thanks !
> Regards
> JB
> 
> On 01/03/2018 07:29 AM, David Morávek wrote:
> 
> Hello JB,
> 
> Perfect! I'm already on the Beam Slack workspace, I'll contact you 
> once
> I get to the office.
> 
> Thanks!
> D.
> 
> On Wed, Jan 3, 2018 at 6:19 AM, Jean-Baptiste Onofré 
>    >> wrote:
> 
>     Hi David,
> 
>     absolutely !! Let's move forward on the preparation steps.
> 
>     Are you on Slack and/or hangout to plan this ?
> 
>     Thanks,
>     Regards
>     JB
> 
>     On 01/02/2018 05:35 PM, David Morávek wrote:
> 
>         Hello JB,
> 
>         can we help in any way to move things forward?
> 
>         Thanks,
>         D.
> 
>         On Mon, Dec 18, 2017 at 4:28 PM, Jean-Baptiste Onofré
> 
>         >
> 
>          
>              Thanks Jan,
> 
>              It makes sense.
> 
>              Let me take a look on the code to understand the 
> "interaction".
> 
>              Regards
>              JB
> 
> 
>              On 12/18/2017 04:26 PM, Jan Lukavský wrote:
> 
>                  Hi JB,
> 
>                  basically you are not wrong. The project started 
> about
> three or
>         four
>                  years ago with a goal to unify batch and streaming
> processing into
>                  single portable, executor independent API. Because of
> that, it is
>                  currently "close" to Beam in this sense. But we don't
> see much
>         added
>                  value keeping this as a separate project, with one of
> the key
>                  differences to be the API (not the model itself), so 
> we
> would
>         like to
>                  focus on translation from Euphoria API to Beam's SDK.
> That's why we
>                  would like to see it as a DSL, so that it would be
> possible to use
>                  Euphoria API with Beam's runners as much natively as
> possible.
> 
>                  I hope I didn't make the subject even more unclear, 
> if
> so, I'll
>         be happy
>                  to explain anything in more detail. :-)
> 
>                      Jan
> 
> 
>                  On 12/18/2017 04:08 PM, Jean-Baptiste Onofré wrote:
> 
>                      Hi Jan,
> 
>                      Thanks for your answers.
> 
>                      However, they confused me ;)
> 
>                      Regarding what you replied, Euphoria seems like a
> programming
>                      model/SDK "close" to Beam more than a DSL on top 
> of an
>         existing Beam
>                      SDK.
> 
>                      Am I wrong ?
> 
>                      Regards
>                      JB
> 
>                      On 12/18/2017 03:44 PM, Jan Lukavský wrote:
> 
>                          Hi Ismael,
> 
>                          basically we adopted the Beam's design 
> regarding
>         partitioning
>                          
> (https://github.com/seznam/euphoria/issues/160
> 
>          >
>                          
>  
>          >>) and implemented
>                          the sorting manually
>                          
> (https://github.com/seznam/euphoria/issues/158
> 
>          

Re: @TearDown guarantees

2018-02-18 Thread Ben Chambers
Are you sure that focusing on the cleanup of specific DoFn's is
appropriate? Many cases where cleanup is necessary, it is around an entire
composite PTransform. I think there have been discussions/proposals around
a more methodical "cleanup" option, but those haven't been implemented, to
the best of my knowledge.

For instance, consider the steps of a FileIO:
1. Write to a bunch (N shards) of temporary files
2. When all temporary files are complete, attempt to do a bulk copy to put
them in the final destination.
3. Cleanup all the temporary files.

(This is often desirable because it minimizes the chance of seeing
partial/incomplete results in the final destination).

In the above, you'd want step 1 to execute on many workers, likely using a
ParDo (say N different workers).
The move step should only happen once, so on one worker. This means it will
be a different DoFn, likely with some stuff done to ensure it runs on one
worker.

In such a case, cleanup / @TearDown of the DoFn is not enough. We need an
API for a PTransform to schedule some cleanup work for when the transform
is "done". In batch this is relatively straightforward, but doesn't exist.
This is the source of some problems, such as BigQuery sink leaving files
around that have failed to import into BigQuery.

In streaming this is less straightforward -- do you want to wait until the
end of the pipeline? Or do you want to wait until the end of the window? In
practice, you just want to wait until you know nobody will need the
resource anymore.

This led to some discussions around a "cleanup" API, where you could have a
transform that output resource objects. Each resource object would have
logic for cleaning it up. And there would be something that indicated what
parts of the pipeline needed that resource, and what kind of temporal
lifetime those objects had. As soon as that part of the pipeline had
advanced far enough that it would no longer need the resources, they would
get cleaned up. This can be done at pipeline shutdown, or incrementally
during a streaming pipeline, etc.

Would something like this be a better fit for your use case? If not, why is
handling teardown within a single DoFn sufficient?

On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau 
wrote:

> Yes 1M. Lets try to explain you simplifying the overall execution. Each
> instance - one fn so likely in a thread of a worker - has its lifecycle.
> Caricaturally: "new" and garbage collection.
>
> In practise, new is often an unsafe allocate (deserialization) but it
> doesnt matter here.
>
> What i want is any "new" to have a following setup before any process or
> stattbundle and the last time beam has the instance before it is gc-ed and
> after last finishbundle it calls teardown.
>
> It is as simple as it.
> This way no need to comibe fn in a way making a fn not self contained to
> implement basic transforms.
>
> Le 18 févr. 2018 20:07, "Reuven Lax"  a écrit :
>
>>
>>
>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>>
>>>
>>> Le 18 févr. 2018 19:28, "Ben Chambers"  a écrit :
>>>
>>> It feels like his thread may be a bit off-track. Rather than focusing on
>>> the semantics of the existing methods -- which have been noted to be meet
>>> many existing use cases -- it would be helpful to focus on more on the
>>> reason you are looking for something with different semantics.
>>>
>>> Some possibilities (I'm not sure which one you are trying to do):
>>>
>>> 1. Clean-up some external, global resource, that was initialized once
>>> during the startup of the pipeline. If this is the case, how are you
>>> ensuring it was really only initialized once (and not once per worker, per
>>> thread, per instance, etc.)? How do you know when the pipeline should
>>> release it? If the answer is "when it reaches step X", then what about a
>>> streaming pipeline?
>>>
>>>
>>> When the dofn is no more needed logically ie when the batch is done or
>>> stream is stopped (manually or by a jvm shutdown)
>>>
>>
>> I'm really not following what this means.
>>
>> Let's say that a pipeline is running 1000 workers, and each worker is
>> running 1000 threads (each running a copy of the same DoFn). How many
>> cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when do
>> you want it called? When the entire pipeline is shut down? When an
>> individual worker is about to shut down (which may be temporary - may be
>> about to start back up)? Something else?
>>
>>
>>
>>>
>>>
>>>
>>> 2. Finalize some resources that are used within some region of the
>>> pipeline. While, the DoFn lifecycle methods are not a good fit for this
>>> (they are focused on managing resources within the DoFn), you could model
>>> this on how FileIO finalizes the files that it produced. For instance:
>>>a) ParDo generates "resource IDs" (or some token that stores
>>> information about resources)
>>>b) "Require 

Re: @TearDown guarantees

2018-02-18 Thread Romain Manni-Bucau
I kind of agree except transforms lack a lifecycle too. My understanding is
that sdf could be a way to unify it and clean the api.

Otherwise how to normalize - single api -  lifecycle of transforms?

Le 18 févr. 2018 21:32, "Ben Chambers"  a écrit :

> Are you sure that focusing on the cleanup of specific DoFn's is
> appropriate? Many cases where cleanup is necessary, it is around an entire
> composite PTransform. I think there have been discussions/proposals around
> a more methodical "cleanup" option, but those haven't been implemented, to
> the best of my knowledge.
>
> For instance, consider the steps of a FileIO:
> 1. Write to a bunch (N shards) of temporary files
> 2. When all temporary files are complete, attempt to do a bulk copy to put
> them in the final destination.
> 3. Cleanup all the temporary files.
>
> (This is often desirable because it minimizes the chance of seeing
> partial/incomplete results in the final destination).
>
> In the above, you'd want step 1 to execute on many workers, likely using a
> ParDo (say N different workers).
> The move step should only happen once, so on one worker. This means it
> will be a different DoFn, likely with some stuff done to ensure it runs on
> one worker.
>
> In such a case, cleanup / @TearDown of the DoFn is not enough. We need an
> API for a PTransform to schedule some cleanup work for when the transform
> is "done". In batch this is relatively straightforward, but doesn't exist.
> This is the source of some problems, such as BigQuery sink leaving files
> around that have failed to import into BigQuery.
>
> In streaming this is less straightforward -- do you want to wait until the
> end of the pipeline? Or do you want to wait until the end of the window? In
> practice, you just want to wait until you know nobody will need the
> resource anymore.
>
> This led to some discussions around a "cleanup" API, where you could have
> a transform that output resource objects. Each resource object would have
> logic for cleaning it up. And there would be something that indicated what
> parts of the pipeline needed that resource, and what kind of temporal
> lifetime those objects had. As soon as that part of the pipeline had
> advanced far enough that it would no longer need the resources, they would
> get cleaned up. This can be done at pipeline shutdown, or incrementally
> during a streaming pipeline, etc.
>
> Would something like this be a better fit for your use case? If not, why
> is handling teardown within a single DoFn sufficient?
>
> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau 
> wrote:
>
>> Yes 1M. Lets try to explain you simplifying the overall execution. Each
>> instance - one fn so likely in a thread of a worker - has its lifecycle.
>> Caricaturally: "new" and garbage collection.
>>
>> In practise, new is often an unsafe allocate (deserialization) but it
>> doesnt matter here.
>>
>> What i want is any "new" to have a following setup before any process or
>> stattbundle and the last time beam has the instance before it is gc-ed and
>> after last finishbundle it calls teardown.
>>
>> It is as simple as it.
>> This way no need to comibe fn in a way making a fn not self contained to
>> implement basic transforms.
>>
>> Le 18 févr. 2018 20:07, "Reuven Lax"  a écrit :
>>
>>>
>>>
>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>


 Le 18 févr. 2018 19:28, "Ben Chambers"  a écrit :

 It feels like his thread may be a bit off-track. Rather than focusing
 on the semantics of the existing methods -- which have been noted to be
 meet many existing use cases -- it would be helpful to focus on more on the
 reason you are looking for something with different semantics.

 Some possibilities (I'm not sure which one you are trying to do):

 1. Clean-up some external, global resource, that was initialized once
 during the startup of the pipeline. If this is the case, how are you
 ensuring it was really only initialized once (and not once per worker, per
 thread, per instance, etc.)? How do you know when the pipeline should
 release it? If the answer is "when it reaches step X", then what about a
 streaming pipeline?


 When the dofn is no more needed logically ie when the batch is done or
 stream is stopped (manually or by a jvm shutdown)

>>>
>>> I'm really not following what this means.
>>>
>>> Let's say that a pipeline is running 1000 workers, and each worker is
>>> running 1000 threads (each running a copy of the same DoFn). How many
>>> cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when do
>>> you want it called? When the entire pipeline is shut down? When an
>>> individual worker is about to shut down (which may be temporary - may be
>>> about to start back up)? Something else?
>>>
>>>
>>>



 2. 

Re: Euphoria Java 8 DSL - proposal

2018-02-18 Thread Davor Bonaci
I may have missed things, but any update on the progress of this donation?

On Tue, Jan 2, 2018 at 10:52 PM, Jean-Baptiste Onofré 
wrote:

> Great !
>
> Thanks !
> Regards
> JB
>
> On 01/03/2018 07:29 AM, David Morávek wrote:
>
>> Hello JB,
>>
>> Perfect! I'm already on the Beam Slack workspace, I'll contact you once I
>> get to the office.
>>
>> Thanks!
>> D.
>>
>> On Wed, Jan 3, 2018 at 6:19 AM, Jean-Baptiste Onofré > > wrote:
>>
>> Hi David,
>>
>> absolutely !! Let's move forward on the preparation steps.
>>
>> Are you on Slack and/or hangout to plan this ?
>>
>> Thanks,
>> Regards
>> JB
>>
>> On 01/02/2018 05:35 PM, David Morávek wrote:
>>
>> Hello JB,
>>
>> can we help in any way to move things forward?
>>
>> Thanks,
>> D.
>>
>> On Mon, Dec 18, 2017 at 4:28 PM, Jean-Baptiste Onofré <
>> j...@nanthrax.net
>>  > >> wrote:
>>
>>  Thanks Jan,
>>
>>  It makes sense.
>>
>>  Let me take a look on the code to understand the
>> "interaction".
>>
>>  Regards
>>  JB
>>
>>
>>  On 12/18/2017 04:26 PM, Jan Lukavský wrote:
>>
>>  Hi JB,
>>
>>  basically you are not wrong. The project started about
>> three or
>> four
>>  years ago with a goal to unify batch and streaming
>> processing into
>>  single portable, executor independent API. Because of
>> that, it is
>>  currently "close" to Beam in this sense. But we don't
>> see much
>> added
>>  value keeping this as a separate project, with one of
>> the key
>>  differences to be the API (not the model itself), so we
>> would
>> like to
>>  focus on translation from Euphoria API to Beam's SDK.
>> That's why we
>>  would like to see it as a DSL, so that it would be
>> possible to use
>>  Euphoria API with Beam's runners as much natively as
>> possible.
>>
>>  I hope I didn't make the subject even more unclear, if
>> so, I'll
>> be happy
>>  to explain anything in more detail. :-)
>>
>>  Jan
>>
>>
>>  On 12/18/2017 04:08 PM, Jean-Baptiste Onofré wrote:
>>
>>  Hi Jan,
>>
>>  Thanks for your answers.
>>
>>  However, they confused me ;)
>>
>>  Regarding what you replied, Euphoria seems like a
>> programming
>>  model/SDK "close" to Beam more than a DSL on top of
>> an
>> existing Beam
>>  SDK.
>>
>>  Am I wrong ?
>>
>>  Regards
>>  JB
>>
>>  On 12/18/2017 03:44 PM, Jan Lukavský wrote:
>>
>>  Hi Ismael,
>>
>>  basically we adopted the Beam's design regarding
>> partitioning
>>  (https://github.com/seznam/euphoria/issues/160
>> 
>>  > >) and implemented
>>  the sorting manually
>>  (https://github.com/seznam/euphoria/issues/158
>> 
>>  > >). I'm not aware
>>  of the time model differences (Euphoria supports
>> ingestion and
>>  event time, we don't support processing time by
>> decision).
>>  Regarding other differences (looking into Beam
>> capability
>>  matrix, I'd say that):
>>
>> - we don't support stateful FlatMap (i.e.
>> ParDo) for now
>>  (https://github.com/seznam/euphoria/issues/192
>> 
>>  > >)
>>
>> - we don't support side inputs (by decision
>> now, but
>> might be
>>  reconsidered) and outputs
>>  (https://github.com/seznam/euphoria/issues/124
>> 
>>  > >)
>>
>>
>> - we support complete event-time 

Re: @TearDown guarantees

2018-02-18 Thread Reuven Lax
On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau 
wrote:

>
>
> Le 18 févr. 2018 19:28, "Ben Chambers"  a écrit :
>
> It feels like his thread may be a bit off-track. Rather than focusing on
> the semantics of the existing methods -- which have been noted to be meet
> many existing use cases -- it would be helpful to focus on more on the
> reason you are looking for something with different semantics.
>
> Some possibilities (I'm not sure which one you are trying to do):
>
> 1. Clean-up some external, global resource, that was initialized once
> during the startup of the pipeline. If this is the case, how are you
> ensuring it was really only initialized once (and not once per worker, per
> thread, per instance, etc.)? How do you know when the pipeline should
> release it? If the answer is "when it reaches step X", then what about a
> streaming pipeline?
>
>
> When the dofn is no more needed logically ie when the batch is done or
> stream is stopped (manually or by a jvm shutdown)
>

I'm really not following what this means.

Let's say that a pipeline is running 1000 workers, and each worker is
running 1000 threads (each running a copy of the same DoFn). How many
cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when do
you want it called? When the entire pipeline is shut down? When an
individual worker is about to shut down (which may be temporary - may be
about to start back up)? Something else?



>
>
>
> 2. Finalize some resources that are used within some region of the
> pipeline. While, the DoFn lifecycle methods are not a good fit for this
> (they are focused on managing resources within the DoFn), you could model
> this on how FileIO finalizes the files that it produced. For instance:
>a) ParDo generates "resource IDs" (or some token that stores
> information about resources)
>b) "Require Deterministic Input" (to prevent retries from changing
> resource IDs)
>c) ParDo that initializes the resources
>d) Pipeline segments that use the resources, and eventually output the
> fact they're done
>e) "Require Deterministic Input"
>f) ParDo that frees the resources
>
> By making the use of the resource part of the data it is possible to
> "checkpoint" which resources may be in use or have been finished by using
> the require deterministic input. This is important to ensuring everything
> is actually cleaned up.
>
>
> I nees that but generic and not case by case to industrialize some api on
> top of beam.
>
>
>
> 3. Some other use case that I may be missing? If it is this case, could
> you elaborate on what you are trying to accomplish? That would help me
> understand both the problems with existing options and possibly what could
> be done to help.
>
>
> I understand there are sorkaround for almost all cases but means each
> transform is different in its lifecycle handling  except i dislike it a lot
> at a scale and as a user since you cant put any unified practise on top of
> beam, it also makes beam very hard to integrate or to use to build higher
> level libraries or softwares.
>
> This is why i tried to not start the workaround discussions and just stay
> at API level.
>
>
>
> -- Ben
>
>
> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau 
> wrote:
>
>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov :
>>
>>> "Machine state" is overly low-level because many of the possible reasons
>>> can happen on a perfectly fine machine.
>>> If you'd like to rephrase it to "it will be called except in various
>>> situations where it's logically impossible or impractical to guarantee that
>>> it's called", that's fine. Or you can list some of the examples above.
>>>
>>
>> Sounds ok to me
>>
>>
>>>
>>> The main point for the user is, you *will* see non-preventable
>>> situations where it couldn't be called - it's not just intergalactic
>>> crashes - so if the logic is very important (e.g. cleaning up a large
>>> amount of temporary files, shutting down a large number of VMs you started
>>> etc), you have to express it using one of the other methods that have
>>> stricter guarantees (which obviously come at a cost, e.g. no
>>> pass-by-reference).
>>>
>>
>> FinishBundle has the exact same guarantee sadly so not which which other
>> method you speak about. Concretely if you make it really unreliable - this
>> is what best effort sounds to me - then users can use it to clean anything
>> but if you make it "can happen but it is unexpected and means something
>> happent" then it is fine to have a manual - or auto if fancy - recovery
>> procedure. This is where it makes all the difference and impacts the
>> developpers, ops (all users basically).
>>
>>
>>>
>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
 Agree Eugene except that "best effort" means that. It is also often
 used to say "at will" and this is what triggered this thread.

 

Re: @TearDown guarantees

2018-02-18 Thread Reuven Lax
On Sun, Feb 18, 2018 at 11:07 AM, Reuven Lax  wrote:

>
>
> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
> rmannibu...@gmail.com> wrote:
>
>>
>>
>> Le 18 févr. 2018 19:28, "Ben Chambers"  a écrit :
>>
>> It feels like his thread may be a bit off-track. Rather than focusing on
>> the semantics of the existing methods -- which have been noted to be meet
>> many existing use cases -- it would be helpful to focus on more on the
>> reason you are looking for something with different semantics.
>>
>> Some possibilities (I'm not sure which one you are trying to do):
>>
>> 1. Clean-up some external, global resource, that was initialized once
>> during the startup of the pipeline. If this is the case, how are you
>> ensuring it was really only initialized once (and not once per worker, per
>> thread, per instance, etc.)? How do you know when the pipeline should
>> release it? If the answer is "when it reaches step X", then what about a
>> streaming pipeline?
>>
>>
>> When the dofn is no more needed logically ie when the batch is done or
>> stream is stopped (manually or by a jvm shutdown)
>>
>
> I'm really not following what this means.
>
> Let's say that a pipeline is running 1000 workers, and each worker is
> running 1000 threads (each running a copy of the same DoFn). How many
> cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when do
> you want it called? When the entire pipeline is shut down? When an
> individual worker is about to shut down (which may be temporary - may be
> about to start back up)? Something else?
>

Maybe you can explain the use case a bit more to me. Most resources I'm
aware of that are "sticky" and need cleanup despite worker crashes (e.g.
creating a VM), are also not resources you want to be creating and
destroying millions of times.


>
>
>
>>
>>
>>
>> 2. Finalize some resources that are used within some region of the
>> pipeline. While, the DoFn lifecycle methods are not a good fit for this
>> (they are focused on managing resources within the DoFn), you could model
>> this on how FileIO finalizes the files that it produced. For instance:
>>a) ParDo generates "resource IDs" (or some token that stores
>> information about resources)
>>b) "Require Deterministic Input" (to prevent retries from changing
>> resource IDs)
>>c) ParDo that initializes the resources
>>d) Pipeline segments that use the resources, and eventually output the
>> fact they're done
>>e) "Require Deterministic Input"
>>f) ParDo that frees the resources
>>
>> By making the use of the resource part of the data it is possible to
>> "checkpoint" which resources may be in use or have been finished by using
>> the require deterministic input. This is important to ensuring everything
>> is actually cleaned up.
>>
>>
>> I nees that but generic and not case by case to industrialize some api on
>> top of beam.
>>
>>
>>
>> 3. Some other use case that I may be missing? If it is this case, could
>> you elaborate on what you are trying to accomplish? That would help me
>> understand both the problems with existing options and possibly what could
>> be done to help.
>>
>>
>> I understand there are sorkaround for almost all cases but means each
>> transform is different in its lifecycle handling  except i dislike it a lot
>> at a scale and as a user since you cant put any unified practise on top of
>> beam, it also makes beam very hard to integrate or to use to build higher
>> level libraries or softwares.
>>
>> This is why i tried to not start the workaround discussions and just stay
>> at API level.
>>
>>
>>
>> -- Ben
>>
>>
>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau 
>> wrote:
>>
>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov :
>>>
 "Machine state" is overly low-level because many of the possible
 reasons can happen on a perfectly fine machine.
 If you'd like to rephrase it to "it will be called except in various
 situations where it's logically impossible or impractical to guarantee that
 it's called", that's fine. Or you can list some of the examples above.

>>>
>>> Sounds ok to me
>>>
>>>

 The main point for the user is, you *will* see non-preventable
 situations where it couldn't be called - it's not just intergalactic
 crashes - so if the logic is very important (e.g. cleaning up a large
 amount of temporary files, shutting down a large number of VMs you started
 etc), you have to express it using one of the other methods that have
 stricter guarantees (which obviously come at a cost, e.g. no
 pass-by-reference).

>>>
>>> FinishBundle has the exact same guarantee sadly so not which which other
>>> method you speak about. Concretely if you make it really unreliable - this
>>> is what best effort sounds to me - then users can use it to clean anything
>>> but if you make it "can happen but it is unexpected and means something

Re: @TearDown guarantees

2018-02-18 Thread Eugene Kirpichov
On Sun, Feb 18, 2018 at 10:25 AM Romain Manni-Bucau 
wrote:

> 2018-02-18 19:19 GMT+01:00 Eugene Kirpichov :
>
>> FinishBundle has a stronger guarantee: if the pipeline succeeded, then it
>> has been called for every succeeded bundle, and succeeded bundles together
>> cover the entire input PCollection. Of course, it may not have been called
>> for failed bundles.
>> To anticipate a possible objection "why not also keep retrying Teardown
>> until it succeeds" - because if Teardown wasn't called on a DoFn instance,
>> it's because the instance no longer exists and there's nothing to call it
>> on.
>>
>> Please take a look at implementations of WriteFiles and BigQueryIO.read()
>> and write() to see how cleanup of heavyweight resources (large number of
>> temp files, temporary BigQuery datasets) can be achieved reliably to the
>> extent possible.
>>
>
> Do you mean passing state accross the fn and having a fn responsible of
> the cleanup? Kind of making the teardown a processelement? This is a nice
> workaround but it is not always possible as mentionned. Ismael even has a
> nice case where this just fails and teardown would work - was with AWS, not
> a bigquery bug,  but same design.
>
I don't remember this case and would appreciate being reminded what it is.

But in general, yes, there unfortunately exist systems designed in a way
that reliably achieving cleanup when interacting with them from a
fault-tolerant distributed system like Beam is simply impossible. We should
consider on a case-by-case basis whether any given system is like this, and
what to do if it is.



>
>
>>
>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau 
>> wrote:
>>
>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov :
>>>
 "Machine state" is overly low-level because many of the possible
 reasons can happen on a perfectly fine machine.
 If you'd like to rephrase it to "it will be called except in various
 situations where it's logically impossible or impractical to guarantee that
 it's called", that's fine. Or you can list some of the examples above.

>>>
>>> Sounds ok to me
>>>
>>>

 The main point for the user is, you *will* see non-preventable
 situations where it couldn't be called - it's not just intergalactic
 crashes - so if the logic is very important (e.g. cleaning up a large
 amount of temporary files, shutting down a large number of VMs you started
 etc), you have to express it using one of the other methods that have
 stricter guarantees (which obviously come at a cost, e.g. no
 pass-by-reference).

>>>
>>> FinishBundle has the exact same guarantee sadly so not which which other
>>> method you speak about. Concretely if you make it really unreliable - this
>>> is what best effort sounds to me - then users can use it to clean anything
>>> but if you make it "can happen but it is unexpected and means something
>>> happent" then it is fine to have a manual - or auto if fancy - recovery
>>> procedure. This is where it makes all the difference and impacts the
>>> developpers, ops (all users basically).
>>>
>>>

 On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau <
 rmannibu...@gmail.com> wrote:

> Agree Eugene except that "best effort" means that. It is also often
> used to say "at will" and this is what triggered this thread.
>
> I'm fine using "except if the machine state prevents it" but "best
> effort" is too open and can be very badly and wrongly perceived by users
> (like I did).
>
>
> Romain Manni-Bucau
> @rmannibucau  |  Blog
>  | Old Blog
>  | Github
>  | LinkedIn
>  | Book
> 
>
> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov :
>
>> It will not be called if it's impossible to call it: in the example
>> situation you have (intergalactic crash), and in a number of more common
>> cases: eg in case the worker container has crashed (eg user code in a
>> different thread called a C library over JNI and it segfaulted), JVM bug,
>> crash due to user code OOM, in case the worker has lost network
>> connectivity (then it may be called but it won't be able to do anything
>> useful), in case this is running on a preemptible VM and it was preempted
>> by the underlying cluster manager without notice or if the worker was too
>> busy with other stuff (eg calling other Teardown functions) until the
>> preemption timeout elapsed, in case the underlying hardware simply failed
>> (which happens quite often at scale), and in many other conditions.
>>
>> "Best effort" is the commonly used 

Re: @TearDown guarantees

2018-02-18 Thread Romain Manni-Bucau
Le 18 févr. 2018 19:28, "Ben Chambers"  a écrit :

It feels like his thread may be a bit off-track. Rather than focusing on
the semantics of the existing methods -- which have been noted to be meet
many existing use cases -- it would be helpful to focus on more on the
reason you are looking for something with different semantics.

Some possibilities (I'm not sure which one you are trying to do):

1. Clean-up some external, global resource, that was initialized once
during the startup of the pipeline. If this is the case, how are you
ensuring it was really only initialized once (and not once per worker, per
thread, per instance, etc.)? How do you know when the pipeline should
release it? If the answer is "when it reaches step X", then what about a
streaming pipeline?


When the dofn is no more needed logically ie when the batch is done or
stream is stopped (manually or by a jvm shutdown)



2. Finalize some resources that are used within some region of the
pipeline. While, the DoFn lifecycle methods are not a good fit for this
(they are focused on managing resources within the DoFn), you could model
this on how FileIO finalizes the files that it produced. For instance:
   a) ParDo generates "resource IDs" (or some token that stores information
about resources)
   b) "Require Deterministic Input" (to prevent retries from changing
resource IDs)
   c) ParDo that initializes the resources
   d) Pipeline segments that use the resources, and eventually output the
fact they're done
   e) "Require Deterministic Input"
   f) ParDo that frees the resources

By making the use of the resource part of the data it is possible to
"checkpoint" which resources may be in use or have been finished by using
the require deterministic input. This is important to ensuring everything
is actually cleaned up.


I nees that but generic and not case by case to industrialize some api on
top of beam.



3. Some other use case that I may be missing? If it is this case, could you
elaborate on what you are trying to accomplish? That would help me
understand both the problems with existing options and possibly what could
be done to help.


I understand there are sorkaround for almost all cases but means each
transform is different in its lifecycle handling  except i dislike it a lot
at a scale and as a user since you cant put any unified practise on top of
beam, it also makes beam very hard to integrate or to use to build higher
level libraries or softwares.

This is why i tried to not start the workaround discussions and just stay
at API level.



-- Ben


On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau 
wrote:

> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov :
>
>> "Machine state" is overly low-level because many of the possible reasons
>> can happen on a perfectly fine machine.
>> If you'd like to rephrase it to "it will be called except in various
>> situations where it's logically impossible or impractical to guarantee that
>> it's called", that's fine. Or you can list some of the examples above.
>>
>
> Sounds ok to me
>
>
>>
>> The main point for the user is, you *will* see non-preventable situations
>> where it couldn't be called - it's not just intergalactic crashes - so if
>> the logic is very important (e.g. cleaning up a large amount of temporary
>> files, shutting down a large number of VMs you started etc), you have to
>> express it using one of the other methods that have stricter guarantees
>> (which obviously come at a cost, e.g. no pass-by-reference).
>>
>
> FinishBundle has the exact same guarantee sadly so not which which other
> method you speak about. Concretely if you make it really unreliable - this
> is what best effort sounds to me - then users can use it to clean anything
> but if you make it "can happen but it is unexpected and means something
> happent" then it is fine to have a manual - or auto if fancy - recovery
> procedure. This is where it makes all the difference and impacts the
> developpers, ops (all users basically).
>
>
>>
>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau 
>> wrote:
>>
>>> Agree Eugene except that "best effort" means that. It is also often used
>>> to say "at will" and this is what triggered this thread.
>>>
>>> I'm fine using "except if the machine state prevents it" but "best
>>> effort" is too open and can be very badly and wrongly perceived by users
>>> (like I did).
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau  |  Blog
>>>  | Old Blog
>>>  | Github
>>>  | LinkedIn
>>>  | Book
>>> 
>>>
>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov :
>>>
 It will not be called if it's impossible to call it: in the example
 

Re: @TearDown guarantees

2018-02-18 Thread Romain Manni-Bucau
Le 18 févr. 2018 00:23, "Kenneth Knowles"  a écrit :

On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau 
wrote:
>
> If you give an example of a high-level need (e.g. "I'm trying to write an
> IO for system $x and it requires the following initialization and the
> following cleanup logic and the following processing in between") I'll be
> better able to help you.
>
>
> Take a simple example of a transform requiring a connection. Using bundles
> is a perf killer since size is not controlled. Using teardown doesnt allow
> you to release the connection since it is a best effort thing. Not
> releasing the connection makes you pay a lot - aws ;) - or prevents you to
> launch other processings - concurrent limit.
>

For this example @Teardown is an exact fit. If things die so badly that
@Teardown is not called then nothing else can be called to close the
connection either. What AWS service are you thinking of that stays open for
a long time when everything at the other end has died?


You assume connections are kind of stateless but some (proprietary)
protocols requires some closing exchanges which are not only "im leaving".

For aws i was thinking about starting some services - machines - on the fly
in a pipeline startup and closing them at the end. If teardown is not
called you leak machines and money. You can say it can be done another
way...as the full pipeline ;).

I dont want to be picky but if beam cant handle its components lifecycle it
can be used at scale for generic pipelines and if bound to some particular
IO.

What does prevent to enforce teardown - ignoring the interstellar crash
case which cant be handled by any human system? Nothing technically. Why do
you push to not handle it? Is it due to some legacy code on dataflow or
something else?

Also what does it mean for the users? Direct runner does it so if a user
udes the RI in test, he will get a different behavior in prod? Also dont
forget the user doesnt know what the IOs he composes use so this is so
impacting for the whole product than he must be handled IMHO.

I understand the portability culture is new in big data world but it is not
a reason to ignore what people did for years and do it wrong before doing
right ;).

My proposal is to list what can prevent to guarantee - in the normal IT
conditions - the execution of teardown. Then we see if we can handle it and
only if there is a technical reason we cant we make it
experimental/unsupported in the api. I know spark and flink can, any
unknown blocker for other runners?

Technical note: even a kill should go through java shutdown hooks otherwise
your environment (beam enclosing software) is fully unhandled and your
overall system is uncontrolled. Only case where it is not true is when the
software is always owned by a vendor and never installed on customer
environment. In this case it belongd to the vendor to handle beam API and
not to beam to adjust its API for a vendor - otherwise all unsupported
features by one runner should be made optional right?

All state is not about network, even in distributed systems so this is key
to have an explicit and defined lifecycle.


Kenn


Re: @TearDown guarantees

2018-02-18 Thread Romain Manni-Bucau
2018-02-18 18:36 GMT+01:00 Eugene Kirpichov :

> "Machine state" is overly low-level because many of the possible reasons
> can happen on a perfectly fine machine.
> If you'd like to rephrase it to "it will be called except in various
> situations where it's logically impossible or impractical to guarantee that
> it's called", that's fine. Or you can list some of the examples above.
>

Sounds ok to me


>
> The main point for the user is, you *will* see non-preventable situations
> where it couldn't be called - it's not just intergalactic crashes - so if
> the logic is very important (e.g. cleaning up a large amount of temporary
> files, shutting down a large number of VMs you started etc), you have to
> express it using one of the other methods that have stricter guarantees
> (which obviously come at a cost, e.g. no pass-by-reference).
>

FinishBundle has the exact same guarantee sadly so not which which other
method you speak about. Concretely if you make it really unreliable - this
is what best effort sounds to me - then users can use it to clean anything
but if you make it "can happen but it is unexpected and means something
happent" then it is fine to have a manual - or auto if fancy - recovery
procedure. This is where it makes all the difference and impacts the
developpers, ops (all users basically).


>
> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau 
> wrote:
>
>> Agree Eugene except that "best effort" means that. It is also often used
>> to say "at will" and this is what triggered this thread.
>>
>> I'm fine using "except if the machine state prevents it" but "best
>> effort" is too open and can be very badly and wrongly perceived by users
>> (like I did).
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau  |  Blog
>>  | Old Blog
>>  | Github
>>  | LinkedIn
>>  | Book
>> 
>>
>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov :
>>
>>> It will not be called if it's impossible to call it: in the example
>>> situation you have (intergalactic crash), and in a number of more common
>>> cases: eg in case the worker container has crashed (eg user code in a
>>> different thread called a C library over JNI and it segfaulted), JVM bug,
>>> crash due to user code OOM, in case the worker has lost network
>>> connectivity (then it may be called but it won't be able to do anything
>>> useful), in case this is running on a preemptible VM and it was preempted
>>> by the underlying cluster manager without notice or if the worker was too
>>> busy with other stuff (eg calling other Teardown functions) until the
>>> preemption timeout elapsed, in case the underlying hardware simply failed
>>> (which happens quite often at scale), and in many other conditions.
>>>
>>> "Best effort" is the commonly used term to describe such behavior.
>>> Please feel free to file bugs for cases where you observed a runner not
>>> call Teardown in a situation where it was possible to call it but the
>>> runner made insufficient effort.
>>>
>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau 
>>> wrote:
>>>
 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov :

>
>
> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau <
> rmannibu...@gmail.com> wrote:
>
>>
>>
>> Le 18 févr. 2018 00:23, "Kenneth Knowles"  a écrit :
>>
>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>>
>>> If you give an example of a high-level need (e.g. "I'm trying to
>>> write an IO for system $x and it requires the following initialization 
>>> and
>>> the following cleanup logic and the following processing in between") 
>>> I'll
>>> be better able to help you.
>>>
>>>
>>> Take a simple example of a transform requiring a connection. Using
>>> bundles is a perf killer since size is not controlled. Using teardown
>>> doesnt allow you to release the connection since it is a best effort 
>>> thing.
>>> Not releasing the connection makes you pay a lot - aws ;) - or prevents 
>>> you
>>> to launch other processings - concurrent limit.
>>>
>>
>> For this example @Teardown is an exact fit. If things die so badly
>> that @Teardown is not called then nothing else can be called to close the
>> connection either. What AWS service are you thinking of that stays open 
>> for
>> a long time when everything at the other end has died?
>>
>>
>> You assume connections are kind of stateless but some (proprietary)
>> protocols requires some closing exchanges which are not only "im 
>> leaving".
>>
>> For aws i was 

Re: @TearDown guarantees

2018-02-18 Thread Eugene Kirpichov
FinishBundle has a stronger guarantee: if the pipeline succeeded, then it
has been called for every succeeded bundle, and succeeded bundles together
cover the entire input PCollection. Of course, it may not have been called
for failed bundles.
To anticipate a possible objection "why not also keep retrying Teardown
until it succeeds" - because if Teardown wasn't called on a DoFn instance,
it's because the instance no longer exists and there's nothing to call it
on.

Please take a look at implementations of WriteFiles and BigQueryIO.read()
and write() to see how cleanup of heavyweight resources (large number of
temp files, temporary BigQuery datasets) can be achieved reliably to the
extent possible.

On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau 
wrote:

> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov :
>
>> "Machine state" is overly low-level because many of the possible reasons
>> can happen on a perfectly fine machine.
>> If you'd like to rephrase it to "it will be called except in various
>> situations where it's logically impossible or impractical to guarantee that
>> it's called", that's fine. Or you can list some of the examples above.
>>
>
> Sounds ok to me
>
>
>>
>> The main point for the user is, you *will* see non-preventable situations
>> where it couldn't be called - it's not just intergalactic crashes - so if
>> the logic is very important (e.g. cleaning up a large amount of temporary
>> files, shutting down a large number of VMs you started etc), you have to
>> express it using one of the other methods that have stricter guarantees
>> (which obviously come at a cost, e.g. no pass-by-reference).
>>
>
> FinishBundle has the exact same guarantee sadly so not which which other
> method you speak about. Concretely if you make it really unreliable - this
> is what best effort sounds to me - then users can use it to clean anything
> but if you make it "can happen but it is unexpected and means something
> happent" then it is fine to have a manual - or auto if fancy - recovery
> procedure. This is where it makes all the difference and impacts the
> developpers, ops (all users basically).
>
>
>>
>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau 
>> wrote:
>>
>>> Agree Eugene except that "best effort" means that. It is also often used
>>> to say "at will" and this is what triggered this thread.
>>>
>>> I'm fine using "except if the machine state prevents it" but "best
>>> effort" is too open and can be very badly and wrongly perceived by users
>>> (like I did).
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau  |  Blog
>>>  | Old Blog
>>>  | Github
>>>  | LinkedIn
>>>  | Book
>>> 
>>>
>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov :
>>>
 It will not be called if it's impossible to call it: in the example
 situation you have (intergalactic crash), and in a number of more common
 cases: eg in case the worker container has crashed (eg user code in a
 different thread called a C library over JNI and it segfaulted), JVM bug,
 crash due to user code OOM, in case the worker has lost network
 connectivity (then it may be called but it won't be able to do anything
 useful), in case this is running on a preemptible VM and it was preempted
 by the underlying cluster manager without notice or if the worker was too
 busy with other stuff (eg calling other Teardown functions) until the
 preemption timeout elapsed, in case the underlying hardware simply failed
 (which happens quite often at scale), and in many other conditions.

 "Best effort" is the commonly used term to describe such behavior.
 Please feel free to file bugs for cases where you observed a runner not
 call Teardown in a situation where it was possible to call it but the
 runner made insufficient effort.

 On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau 
 wrote:

> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov :
>
>>
>>
>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>>
>>>
>>> Le 18 févr. 2018 00:23, "Kenneth Knowles"  a écrit :
>>>
>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:

 If you give an example of a high-level need (e.g. "I'm trying to
 write an IO for system $x and it requires the following initialization 
 and
 the following cleanup logic and the following processing in between") 
 I'll
 be better able to help you.


 Take a simple example of a 

Re: @TearDown guarantees

2018-02-18 Thread Romain Manni-Bucau
2018-02-18 19:19 GMT+01:00 Eugene Kirpichov :

> FinishBundle has a stronger guarantee: if the pipeline succeeded, then it
> has been called for every succeeded bundle, and succeeded bundles together
> cover the entire input PCollection. Of course, it may not have been called
> for failed bundles.
> To anticipate a possible objection "why not also keep retrying Teardown
> until it succeeds" - because if Teardown wasn't called on a DoFn instance,
> it's because the instance no longer exists and there's nothing to call it
> on.
>
> Please take a look at implementations of WriteFiles and BigQueryIO.read()
> and write() to see how cleanup of heavyweight resources (large number of
> temp files, temporary BigQuery datasets) can be achieved reliably to the
> extent possible.
>

Do you mean passing state accross the fn and having a fn responsible of the
cleanup? Kind of making the teardown a processelement? This is a nice
workaround but it is not always possible as mentionned. Ismael even has a
nice case where this just fails and teardown would work - was with AWS, not
a bigquery bug,  but same design.


>
> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau 
> wrote:
>
>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov :
>>
>>> "Machine state" is overly low-level because many of the possible reasons
>>> can happen on a perfectly fine machine.
>>> If you'd like to rephrase it to "it will be called except in various
>>> situations where it's logically impossible or impractical to guarantee that
>>> it's called", that's fine. Or you can list some of the examples above.
>>>
>>
>> Sounds ok to me
>>
>>
>>>
>>> The main point for the user is, you *will* see non-preventable
>>> situations where it couldn't be called - it's not just intergalactic
>>> crashes - so if the logic is very important (e.g. cleaning up a large
>>> amount of temporary files, shutting down a large number of VMs you started
>>> etc), you have to express it using one of the other methods that have
>>> stricter guarantees (which obviously come at a cost, e.g. no
>>> pass-by-reference).
>>>
>>
>> FinishBundle has the exact same guarantee sadly so not which which other
>> method you speak about. Concretely if you make it really unreliable - this
>> is what best effort sounds to me - then users can use it to clean anything
>> but if you make it "can happen but it is unexpected and means something
>> happent" then it is fine to have a manual - or auto if fancy - recovery
>> procedure. This is where it makes all the difference and impacts the
>> developpers, ops (all users basically).
>>
>>
>>>
>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
 Agree Eugene except that "best effort" means that. It is also often
 used to say "at will" and this is what triggered this thread.

 I'm fine using "except if the machine state prevents it" but "best
 effort" is too open and can be very badly and wrongly perceived by users
 (like I did).


 Romain Manni-Bucau
 @rmannibucau  |  Blog
  | Old Blog
  | Github
  | LinkedIn
  | Book
 

 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov :

> It will not be called if it's impossible to call it: in the example
> situation you have (intergalactic crash), and in a number of more common
> cases: eg in case the worker container has crashed (eg user code in a
> different thread called a C library over JNI and it segfaulted), JVM bug,
> crash due to user code OOM, in case the worker has lost network
> connectivity (then it may be called but it won't be able to do anything
> useful), in case this is running on a preemptible VM and it was preempted
> by the underlying cluster manager without notice or if the worker was too
> busy with other stuff (eg calling other Teardown functions) until the
> preemption timeout elapsed, in case the underlying hardware simply failed
> (which happens quite often at scale), and in many other conditions.
>
> "Best effort" is the commonly used term to describe such behavior.
> Please feel free to file bugs for cases where you observed a runner not
> call Teardown in a situation where it was possible to call it but the
> runner made insufficient effort.
>
> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau <
> rmannibu...@gmail.com> wrote:
>
>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov :
>>
>>>
>>>
>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>


 Le 18 févr. 2018 

Re: @TearDown guarantees

2018-02-18 Thread Ben Chambers
It feels like his thread may be a bit off-track. Rather than focusing on
the semantics of the existing methods -- which have been noted to be meet
many existing use cases -- it would be helpful to focus on more on the
reason you are looking for something with different semantics.

Some possibilities (I'm not sure which one you are trying to do):

1. Clean-up some external, global resource, that was initialized once
during the startup of the pipeline. If this is the case, how are you
ensuring it was really only initialized once (and not once per worker, per
thread, per instance, etc.)? How do you know when the pipeline should
release it? If the answer is "when it reaches step X", then what about a
streaming pipeline?

2. Finalize some resources that are used within some region of the
pipeline. While, the DoFn lifecycle methods are not a good fit for this
(they are focused on managing resources within the DoFn), you could model
this on how FileIO finalizes the files that it produced. For instance:
   a) ParDo generates "resource IDs" (or some token that stores information
about resources)
   b) "Require Deterministic Input" (to prevent retries from changing
resource IDs)
   c) ParDo that initializes the resources
   d) Pipeline segments that use the resources, and eventually output the
fact they're done
   e) "Require Deterministic Input"
   f) ParDo that frees the resources

By making the use of the resource part of the data it is possible to
"checkpoint" which resources may be in use or have been finished by using
the require deterministic input. This is important to ensuring everything
is actually cleaned up.

3. Some other use case that I may be missing? If it is this case, could you
elaborate on what you are trying to accomplish? That would help me
understand both the problems with existing options and possibly what could
be done to help.

-- Ben


On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau 
wrote:

> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov :
>
>> "Machine state" is overly low-level because many of the possible reasons
>> can happen on a perfectly fine machine.
>> If you'd like to rephrase it to "it will be called except in various
>> situations where it's logically impossible or impractical to guarantee that
>> it's called", that's fine. Or you can list some of the examples above.
>>
>
> Sounds ok to me
>
>
>>
>> The main point for the user is, you *will* see non-preventable situations
>> where it couldn't be called - it's not just intergalactic crashes - so if
>> the logic is very important (e.g. cleaning up a large amount of temporary
>> files, shutting down a large number of VMs you started etc), you have to
>> express it using one of the other methods that have stricter guarantees
>> (which obviously come at a cost, e.g. no pass-by-reference).
>>
>
> FinishBundle has the exact same guarantee sadly so not which which other
> method you speak about. Concretely if you make it really unreliable - this
> is what best effort sounds to me - then users can use it to clean anything
> but if you make it "can happen but it is unexpected and means something
> happent" then it is fine to have a manual - or auto if fancy - recovery
> procedure. This is where it makes all the difference and impacts the
> developpers, ops (all users basically).
>
>
>>
>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau 
>> wrote:
>>
>>> Agree Eugene except that "best effort" means that. It is also often used
>>> to say "at will" and this is what triggered this thread.
>>>
>>> I'm fine using "except if the machine state prevents it" but "best
>>> effort" is too open and can be very badly and wrongly perceived by users
>>> (like I did).
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau  |  Blog
>>>  | Old Blog
>>>  | Github
>>>  | LinkedIn
>>>  | Book
>>> 
>>>
>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov :
>>>
 It will not be called if it's impossible to call it: in the example
 situation you have (intergalactic crash), and in a number of more common
 cases: eg in case the worker container has crashed (eg user code in a
 different thread called a C library over JNI and it segfaulted), JVM bug,
 crash due to user code OOM, in case the worker has lost network
 connectivity (then it may be called but it won't be able to do anything
 useful), in case this is running on a preemptible VM and it was preempted
 by the underlying cluster manager without notice or if the worker was too
 busy with other stuff (eg calling other Teardown functions) until the
 preemption timeout elapsed, in case the underlying hardware simply failed
 (which happens 

Re: @TearDown guarantees

2018-02-18 Thread Romain Manni-Bucau
Le 18 févr. 2018 15:39, "Jean-Baptiste Onofré"  a écrit :

Hi,

I think, as you said, it depends of the protocol and the IO.

For instance, in first version of JdbcIO, I created the connections in
@Setup
and released in @Teardown.

But, in case of streaming system, it's not so good (especially for pooling)
as
the connection stays open for a very long time.


Hmm can be discussed in practise (both pooling and connection holding for
jdbc in beam context) but lets assume it.


So, I updated to deal with connection in @StartBundle and release in
@FinishBundle.



Which leads to an unpredictable bundle size and therefore very very bad
perfs on write size - read size is faked but in mem buffer i guess which
breaks the bundle definition but let s ignore it too for now.


So, I think it depends of the kind of connections: the kind of connection
actually holding resources should be manage in bundle (at least for now),
the
other kind of connection (just wrapping configuration but not holding
resources
like Apache HTTP Component Client for instance) could be dealt in DoFn
lifecycle.



Once again, I would be ok with bundles for now - but it doesnt solve the
real issue - if bundles are up to the user. Since it is not, it doesnt help
and can just degrade the overall behavior in both batch and streaming.

I fully understand beam doesnt handle properly that today. What does block
to do it? Nothing technical so why not doing it?

Technically:

1. Teardown can be guaranteed
2. Bundle size can be highly influenced / configured by user

Both are needed to be able to propose a strong api compared to competitors
and aims to not only have disavantages going portable for users.

Let just do it, no?


Regards
JB

On 02/18/2018 11:05 AM, Romain Manni-Bucau wrote:
>
>
> Le 18 févr. 2018 00:23, "Kenneth Knowles"  > a écrit :
>
> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
rmannibu...@gmail.com
> > wrote:
>
> If you give an example of a high-level need (e.g. "I'm trying
to
> write an IO for system $x and it requires the following
> initialization and the following cleanup logic and the
following
> processing in between") I'll be better able to help you.
>
>
> Take a simple example of a transform requiring a connection. Using
> bundles is a perf killer since size is not controlled. Using
teardown
> doesnt allow you to release the connection since it is a best
effort
> thing. Not releasing the connection makes you pay a lot - aws ;)
- or
> prevents you to launch other processings - concurrent limit.
>
>
> For this example @Teardown is an exact fit. If things die so badly
that
> @Teardown is not called then nothing else can be called to close the
> connection either. What AWS service are you thinking of that stays
open for
> a long time when everything at the other end has died?
>
>
> You assume connections are kind of stateless but some (proprietary)
protocols
> requires some closing exchanges which are not only "im leaving".
>
> For aws i was thinking about starting some services - machines - on the
fly in a
> pipeline startup and closing them at the end. If teardown is not called
you leak
> machines and money. You can say it can be done another way...as the full
> pipeline ;).
>
> I dont want to be picky but if beam cant handle its components lifecycle
it can
> be used at scale for generic pipelines and if bound to some particular IO.
>
> What does prevent to enforce teardown - ignoring the interstellar crash
case
> which cant be handled by any human system? Nothing technically. Why do
you push
> to not handle it? Is it due to some legacy code on dataflow or something
else?
>
> Also what does it mean for the users? Direct runner does it so if a user
udes
> the RI in test, he will get a different behavior in prod? Also dont
forget the
> user doesnt know what the IOs he composes use so this is so impacting for
the
> whole product than he must be handled IMHO.
>
> I understand the portability culture is new in big data world but it is
not a
> reason to ignore what people did for years and do it wrong before doing
right ;).
>
> My proposal is to list what can prevent to guarantee - in the normal IT
> conditions - the execution of teardown. Then we see if we can handle it
and only
> if there is a technical reason we cant we make it
experimental/unsupported in
> the api. I know spark and flink can, any unknown blocker for other
runners?
>
> Technical note: even a kill should go through java shutdown hooks
otherwise your
> environment (beam enclosing software) is fully unhandled and your overall
system
> is uncontrolled. Only case where it is not true is when the software is
always
> owned by a vendor and never installed on customer environment. In this
case it
> belongd to the vendor to handle beam API and not to beam to adjust 

Re: @TearDown guarantees

2018-02-18 Thread Romain Manni-Bucau
2018-02-18 18:00 GMT+01:00 Eugene Kirpichov :

>
>
> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau 
> wrote:
>
>>
>>
>> Le 18 févr. 2018 00:23, "Kenneth Knowles"  a écrit :
>>
>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>>
>>> If you give an example of a high-level need (e.g. "I'm trying to write
>>> an IO for system $x and it requires the following initialization and the
>>> following cleanup logic and the following processing in between") I'll be
>>> better able to help you.
>>>
>>>
>>> Take a simple example of a transform requiring a connection. Using
>>> bundles is a perf killer since size is not controlled. Using teardown
>>> doesnt allow you to release the connection since it is a best effort thing.
>>> Not releasing the connection makes you pay a lot - aws ;) - or prevents you
>>> to launch other processings - concurrent limit.
>>>
>>
>> For this example @Teardown is an exact fit. If things die so badly that
>> @Teardown is not called then nothing else can be called to close the
>> connection either. What AWS service are you thinking of that stays open for
>> a long time when everything at the other end has died?
>>
>>
>> You assume connections are kind of stateless but some (proprietary)
>> protocols requires some closing exchanges which are not only "im leaving".
>>
>> For aws i was thinking about starting some services - machines - on the
>> fly in a pipeline startup and closing them at the end. If teardown is not
>> called you leak machines and money. You can say it can be done another
>> way...as the full pipeline ;).
>>
>> I dont want to be picky but if beam cant handle its components lifecycle
>> it can be used at scale for generic pipelines and if bound to some
>> particular IO.
>>
>> What does prevent to enforce teardown - ignoring the interstellar crash
>> case which cant be handled by any human system? Nothing technically. Why do
>> you push to not handle it? Is it due to some legacy code on dataflow or
>> something else?
>>
> Teardown *is* already documented and implemented this way (best-effort).
> So I'm not sure what kind of change you're asking for.
>

Remove "best effort" from the javadoc. If it is not call then it is a bug
and we are done :).


>
>
>> Also what does it mean for the users? Direct runner does it so if a user
>> udes the RI in test, he will get a different behavior in prod? Also dont
>> forget the user doesnt know what the IOs he composes use so this is so
>> impacting for the whole product than he must be handled IMHO.
>>
>> I understand the portability culture is new in big data world but it is
>> not a reason to ignore what people did for years and do it wrong before
>> doing right ;).
>>
>> My proposal is to list what can prevent to guarantee - in the normal IT
>> conditions - the execution of teardown. Then we see if we can handle it and
>> only if there is a technical reason we cant we make it
>> experimental/unsupported in the api. I know spark and flink can, any
>> unknown blocker for other runners?
>>
>> Technical note: even a kill should go through java shutdown hooks
>> otherwise your environment (beam enclosing software) is fully unhandled and
>> your overall system is uncontrolled. Only case where it is not true is when
>> the software is always owned by a vendor and never installed on customer
>> environment. In this case it belongd to the vendor to handle beam API and
>> not to beam to adjust its API for a vendor - otherwise all unsupported
>> features by one runner should be made optional right?
>>
>> All state is not about network, even in distributed systems so this is
>> key to have an explicit and defined lifecycle.
>>
>>
>> Kenn
>>
>>
>>


Re: @TearDown guarantees

2018-02-18 Thread Eugene Kirpichov
"Machine state" is overly low-level because many of the possible reasons
can happen on a perfectly fine machine.
If you'd like to rephrase it to "it will be called except in various
situations where it's logically impossible or impractical to guarantee that
it's called", that's fine. Or you can list some of the examples above.

The main point for the user is, you *will* see non-preventable situations
where it couldn't be called - it's not just intergalactic crashes - so if
the logic is very important (e.g. cleaning up a large amount of temporary
files, shutting down a large number of VMs you started etc), you have to
express it using one of the other methods that have stricter guarantees
(which obviously come at a cost, e.g. no pass-by-reference).

On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau 
wrote:

> Agree Eugene except that "best effort" means that. It is also often used
> to say "at will" and this is what triggered this thread.
>
> I'm fine using "except if the machine state prevents it" but "best effort"
> is too open and can be very badly and wrongly perceived by users (like I
> did).
>
>
> Romain Manni-Bucau
> @rmannibucau  |  Blog
>  | Old Blog
>  | Github
>  | LinkedIn
>  | Book
> 
>
> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov :
>
>> It will not be called if it's impossible to call it: in the example
>> situation you have (intergalactic crash), and in a number of more common
>> cases: eg in case the worker container has crashed (eg user code in a
>> different thread called a C library over JNI and it segfaulted), JVM bug,
>> crash due to user code OOM, in case the worker has lost network
>> connectivity (then it may be called but it won't be able to do anything
>> useful), in case this is running on a preemptible VM and it was preempted
>> by the underlying cluster manager without notice or if the worker was too
>> busy with other stuff (eg calling other Teardown functions) until the
>> preemption timeout elapsed, in case the underlying hardware simply failed
>> (which happens quite often at scale), and in many other conditions.
>>
>> "Best effort" is the commonly used term to describe such behavior. Please
>> feel free to file bugs for cases where you observed a runner not call
>> Teardown in a situation where it was possible to call it but the runner
>> made insufficient effort.
>>
>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau 
>> wrote:
>>
>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov :
>>>


 On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau 
 wrote:

>
>
> Le 18 févr. 2018 00:23, "Kenneth Knowles"  a écrit :
>
> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com> wrote:
>>
>> If you give an example of a high-level need (e.g. "I'm trying to
>> write an IO for system $x and it requires the following initialization 
>> and
>> the following cleanup logic and the following processing in between") 
>> I'll
>> be better able to help you.
>>
>>
>> Take a simple example of a transform requiring a connection. Using
>> bundles is a perf killer since size is not controlled. Using teardown
>> doesnt allow you to release the connection since it is a best effort 
>> thing.
>> Not releasing the connection makes you pay a lot - aws ;) - or prevents 
>> you
>> to launch other processings - concurrent limit.
>>
>
> For this example @Teardown is an exact fit. If things die so badly
> that @Teardown is not called then nothing else can be called to close the
> connection either. What AWS service are you thinking of that stays open 
> for
> a long time when everything at the other end has died?
>
>
> You assume connections are kind of stateless but some (proprietary)
> protocols requires some closing exchanges which are not only "im leaving".
>
> For aws i was thinking about starting some services - machines - on
> the fly in a pipeline startup and closing them at the end. If teardown is
> not called you leak machines and money. You can say it can be done another
> way...as the full pipeline ;).
>
> I dont want to be picky but if beam cant handle its components
> lifecycle it can be used at scale for generic pipelines and if bound to
> some particular IO.
>
> What does prevent to enforce teardown - ignoring the interstellar
> crash case which cant be handled by any human system? Nothing technically.
> Why do you push to not handle it? Is it due to some legacy code on 
> dataflow
> or 

Re: @TearDown guarantees

2018-02-18 Thread Jean-Baptiste Onofré

My bad, I thought you talked about guarantee in the Runner API.

If it's semantic point in the SDK (enforcement instead of best effort), 
and then if the runner doesn't respect that, it's a limitation/bug in 
the runner, I would agree with that.


Regards
JB

On 18/02/2018 16:58, Romain Manni-Bucau wrote:



Le 18 févr. 2018 15:39, "Jean-Baptiste Onofré" > a écrit :


Hi,

I think, as you said, it depends of the protocol and the IO.

For instance, in first version of JdbcIO, I created the connections
in @Setup
and released in @Teardown.

But, in case of streaming system, it's not so good (especially for
pooling) as
the connection stays open for a very long time.


Hmm can be discussed in practise (both pooling and connection holding 
for jdbc in beam context) but lets assume it.



So, I updated to deal with connection in @StartBundle and release in
@FinishBundle.



Which leads to an unpredictable bundle size and therefore very very bad 
perfs on write size - read size is faked but in mem buffer i guess which 
breaks the bundle definition but let s ignore it too for now.



So, I think it depends of the kind of connections: the kind of
connection
actually holding resources should be manage in bundle (at least for
now), the
other kind of connection (just wrapping configuration but not
holding resources
like Apache HTTP Component Client for instance) could be dealt in
DoFn lifecycle.



Once again, I would be ok with bundles for now - but it doesnt solve the 
real issue - if bundles are up to the user. Since it is not, it doesnt 
help and can just degrade the overall behavior in both batch and streaming.


I fully understand beam doesnt handle properly that today. What does 
block to do it? Nothing technical so why not doing it?


Technically:

1. Teardown can be guaranteed
2. Bundle size can be highly influenced / configured by user

Both are needed to be able to propose a strong api compared to 
competitors and aims to not only have disavantages going portable for users.


Let just do it, no?


Regards
JB

On 02/18/2018 11:05 AM, Romain Manni-Bucau wrote:
 >
 >
 > Le 18 févr. 2018 00:23, "Kenneth Knowles" 
 > >> a écrit :
 >
 >     On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau

 >     >> wrote:
 >
 >             If you give an example of a high-level need (e.g.
"I'm trying to
 >             write an IO for system $x and it requires the following
 >             initialization and the following cleanup logic and
the following
 >             processing in between") I'll be better able to help you.
 >
 >
 >         Take a simple example of a transform requiring a
connection. Using
 >         bundles is a perf killer since size is not controlled.
Using teardown
 >         doesnt allow you to release the connection since it is a
best effort
 >         thing. Not releasing the connection makes you pay a lot -
aws ;) - or
 >         prevents you to launch other processings - concurrent limit.
 >
 >
 >     For this example @Teardown is an exact fit. If things die so
badly that
 >     @Teardown is not called then nothing else can be called to
close the
 >     connection either. What AWS service are you thinking of that
stays open for
 >     a long time when everything at the other end has died?
 >
 >
 > You assume connections are kind of stateless but some
(proprietary) protocols
 > requires some closing exchanges which are not only "im leaving".
 >
 > For aws i was thinking about starting some services - machines -
on the fly in a
 > pipeline startup and closing them at the end. If teardown is not
called you leak
 > machines and money. You can say it can be done another way...as
the full
 > pipeline ;).
 >
 > I dont want to be picky but if beam cant handle its components
lifecycle it can
 > be used at scale for generic pipelines and if bound to some
particular IO.
 >
 > What does prevent to enforce teardown - ignoring the interstellar
crash case
 > which cant be handled by any human system? Nothing technically.
Why do you push
 > to not handle it? Is it due to some legacy code on dataflow or
something else?
 >
 > Also what does it mean for the users? Direct runner does it so if
a user udes
 > the RI in test, he will get a different behavior in prod? Also
dont forget the
 > user doesnt know what the IOs he composes use so this is so
impacting for the
 > whole product than he must be handled IMHO.
 >

Re: @TearDown guarantees

2018-02-18 Thread Romain Manni-Bucau
Yes exactly JB, I just want to ensure the sdk/core API is clear and well
defined and that any not respect of that falls into a runner bug. What I
don't want is that a buggy impl leaks in the SDK/core definition.


Romain Manni-Bucau
@rmannibucau  |  Blog
 | Old Blog
 | Github  |
LinkedIn  | Book


2018-02-18 17:56 GMT+01:00 Jean-Baptiste Onofré :

> My bad, I thought you talked about guarantee in the Runner API.
>
> If it's semantic point in the SDK (enforcement instead of best effort),
> and then if the runner doesn't respect that, it's a limitation/bug in the
> runner, I would agree with that.
>
> Regards
> JB
>
> On 18/02/2018 16:58, Romain Manni-Bucau wrote:
>
>>
>>
>> Le 18 févr. 2018 15:39, "Jean-Baptiste Onofré"  j...@nanthrax.net>> a écrit :
>>
>> Hi,
>>
>> I think, as you said, it depends of the protocol and the IO.
>>
>> For instance, in first version of JdbcIO, I created the connections
>> in @Setup
>> and released in @Teardown.
>>
>> But, in case of streaming system, it's not so good (especially for
>> pooling) as
>> the connection stays open for a very long time.
>>
>>
>> Hmm can be discussed in practise (both pooling and connection holding for
>> jdbc in beam context) but lets assume it.
>>
>>
>> So, I updated to deal with connection in @StartBundle and release in
>> @FinishBundle.
>>
>>
>>
>> Which leads to an unpredictable bundle size and therefore very very bad
>> perfs on write size - read size is faked but in mem buffer i guess which
>> breaks the bundle definition but let s ignore it too for now.
>>
>>
>> So, I think it depends of the kind of connections: the kind of
>> connection
>> actually holding resources should be manage in bundle (at least for
>> now), the
>> other kind of connection (just wrapping configuration but not
>> holding resources
>> like Apache HTTP Component Client for instance) could be dealt in
>> DoFn lifecycle.
>>
>>
>>
>> Once again, I would be ok with bundles for now - but it doesnt solve the
>> real issue - if bundles are up to the user. Since it is not, it doesnt help
>> and can just degrade the overall behavior in both batch and streaming.
>>
>> I fully understand beam doesnt handle properly that today. What does
>> block to do it? Nothing technical so why not doing it?
>>
>> Technically:
>>
>> 1. Teardown can be guaranteed
>> 2. Bundle size can be highly influenced / configured by user
>>
>> Both are needed to be able to propose a strong api compared to
>> competitors and aims to not only have disavantages going portable for users.
>>
>> Let just do it, no?
>>
>>
>> Regards
>> JB
>>
>> On 02/18/2018 11:05 AM, Romain Manni-Bucau wrote:
>>  >
>>  >
>>  > Le 18 févr. 2018 00:23, "Kenneth Knowles" > 
>>  > >> a écrit :
>>  >
>>  > On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau
>> 
>>  > >
>> >> wrote:
>>  >
>>  > If you give an example of a high-level need (e.g.
>> "I'm trying to
>>  > write an IO for system $x and it requires the following
>>  > initialization and the following cleanup logic and
>> the following
>>  > processing in between") I'll be better able to help
>> you.
>>  >
>>  >
>>  > Take a simple example of a transform requiring a
>> connection. Using
>>  > bundles is a perf killer since size is not controlled.
>> Using teardown
>>  > doesnt allow you to release the connection since it is a
>> best effort
>>  > thing. Not releasing the connection makes you pay a lot -
>> aws ;) - or
>>  > prevents you to launch other processings - concurrent
>> limit.
>>  >
>>  >
>>  > For this example @Teardown is an exact fit. If things die so
>> badly that
>>  > @Teardown is not called then nothing else can be called to
>> close the
>>  > connection either. What AWS service are you thinking of that
>> stays open for
>>  > a long time when everything at the other end has died?
>>  >
>>  >
>>  > You assume connections are kind of stateless but some
>> (proprietary) protocols
>>  > requires some closing exchanges which are not only "im leaving".
>>  >
>>  > For aws i was thinking about starting some services - machines -
>> on the fly in a
>>  > pipeline startup and closing them at the end. If teardown 

Re: @TearDown guarantees

2018-02-18 Thread Eugene Kirpichov
It will not be called if it's impossible to call it: in the example
situation you have (intergalactic crash), and in a number of more common
cases: eg in case the worker container has crashed (eg user code in a
different thread called a C library over JNI and it segfaulted), JVM bug,
crash due to user code OOM, in case the worker has lost network
connectivity (then it may be called but it won't be able to do anything
useful), in case this is running on a preemptible VM and it was preempted
by the underlying cluster manager without notice or if the worker was too
busy with other stuff (eg calling other Teardown functions) until the
preemption timeout elapsed, in case the underlying hardware simply failed
(which happens quite often at scale), and in many other conditions.

"Best effort" is the commonly used term to describe such behavior. Please
feel free to file bugs for cases where you observed a runner not call
Teardown in a situation where it was possible to call it but the runner
made insufficient effort.

On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau 
wrote:

> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov :
>
>>
>>
>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau 
>> wrote:
>>
>>>
>>>
>>> Le 18 févr. 2018 00:23, "Kenneth Knowles"  a écrit :
>>>
>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:

 If you give an example of a high-level need (e.g. "I'm trying to write
 an IO for system $x and it requires the following initialization and the
 following cleanup logic and the following processing in between") I'll be
 better able to help you.


 Take a simple example of a transform requiring a connection. Using
 bundles is a perf killer since size is not controlled. Using teardown
 doesnt allow you to release the connection since it is a best effort thing.
 Not releasing the connection makes you pay a lot - aws ;) - or prevents you
 to launch other processings - concurrent limit.

>>>
>>> For this example @Teardown is an exact fit. If things die so badly that
>>> @Teardown is not called then nothing else can be called to close the
>>> connection either. What AWS service are you thinking of that stays open for
>>> a long time when everything at the other end has died?
>>>
>>>
>>> You assume connections are kind of stateless but some (proprietary)
>>> protocols requires some closing exchanges which are not only "im leaving".
>>>
>>> For aws i was thinking about starting some services - machines - on the
>>> fly in a pipeline startup and closing them at the end. If teardown is not
>>> called you leak machines and money. You can say it can be done another
>>> way...as the full pipeline ;).
>>>
>>> I dont want to be picky but if beam cant handle its components lifecycle
>>> it can be used at scale for generic pipelines and if bound to some
>>> particular IO.
>>>
>>> What does prevent to enforce teardown - ignoring the interstellar crash
>>> case which cant be handled by any human system? Nothing technically. Why do
>>> you push to not handle it? Is it due to some legacy code on dataflow or
>>> something else?
>>>
>> Teardown *is* already documented and implemented this way (best-effort).
>> So I'm not sure what kind of change you're asking for.
>>
>
> Remove "best effort" from the javadoc. If it is not call then it is a bug
> and we are done :).
>
>
>>
>>
>>> Also what does it mean for the users? Direct runner does it so if a user
>>> udes the RI in test, he will get a different behavior in prod? Also dont
>>> forget the user doesnt know what the IOs he composes use so this is so
>>> impacting for the whole product than he must be handled IMHO.
>>>
>>> I understand the portability culture is new in big data world but it is
>>> not a reason to ignore what people did for years and do it wrong before
>>> doing right ;).
>>>
>>> My proposal is to list what can prevent to guarantee - in the normal IT
>>> conditions - the execution of teardown. Then we see if we can handle it and
>>> only if there is a technical reason we cant we make it
>>> experimental/unsupported in the api. I know spark and flink can, any
>>> unknown blocker for other runners?
>>>
>>> Technical note: even a kill should go through java shutdown hooks
>>> otherwise your environment (beam enclosing software) is fully unhandled and
>>> your overall system is uncontrolled. Only case where it is not true is when
>>> the software is always owned by a vendor and never installed on customer
>>> environment. In this case it belongd to the vendor to handle beam API and
>>> not to beam to adjust its API for a vendor - otherwise all unsupported
>>> features by one runner should be made optional right?
>>>
>>> All state is not about network, even in distributed systems so this is
>>> key to have an explicit and defined lifecycle.
>>>
>>>
>>> Kenn
>>>
>>>
>>>


Re: @TearDown guarantees

2018-02-18 Thread Romain Manni-Bucau
Agree Eugene except that "best effort" means that. It is also often used to
say "at will" and this is what triggered this thread.

I'm fine using "except if the machine state prevents it" but "best effort"
is too open and can be very badly and wrongly perceived by users (like I
did).


Romain Manni-Bucau
@rmannibucau  |  Blog
 | Old Blog
 | Github  |
LinkedIn  | Book


2018-02-18 18:13 GMT+01:00 Eugene Kirpichov :

> It will not be called if it's impossible to call it: in the example
> situation you have (intergalactic crash), and in a number of more common
> cases: eg in case the worker container has crashed (eg user code in a
> different thread called a C library over JNI and it segfaulted), JVM bug,
> crash due to user code OOM, in case the worker has lost network
> connectivity (then it may be called but it won't be able to do anything
> useful), in case this is running on a preemptible VM and it was preempted
> by the underlying cluster manager without notice or if the worker was too
> busy with other stuff (eg calling other Teardown functions) until the
> preemption timeout elapsed, in case the underlying hardware simply failed
> (which happens quite often at scale), and in many other conditions.
>
> "Best effort" is the commonly used term to describe such behavior. Please
> feel free to file bugs for cases where you observed a runner not call
> Teardown in a situation where it was possible to call it but the runner
> made insufficient effort.
>
> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau 
> wrote:
>
>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov :
>>
>>>
>>>
>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau 
>>> wrote:
>>>


 Le 18 févr. 2018 00:23, "Kenneth Knowles"  a écrit :

 On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
 rmannibu...@gmail.com> wrote:
>
> If you give an example of a high-level need (e.g. "I'm trying to write
> an IO for system $x and it requires the following initialization and the
> following cleanup logic and the following processing in between") I'll be
> better able to help you.
>
>
> Take a simple example of a transform requiring a connection. Using
> bundles is a perf killer since size is not controlled. Using teardown
> doesnt allow you to release the connection since it is a best effort 
> thing.
> Not releasing the connection makes you pay a lot - aws ;) - or prevents 
> you
> to launch other processings - concurrent limit.
>

 For this example @Teardown is an exact fit. If things die so badly that
 @Teardown is not called then nothing else can be called to close the
 connection either. What AWS service are you thinking of that stays open for
 a long time when everything at the other end has died?


 You assume connections are kind of stateless but some (proprietary)
 protocols requires some closing exchanges which are not only "im leaving".

 For aws i was thinking about starting some services - machines - on the
 fly in a pipeline startup and closing them at the end. If teardown is not
 called you leak machines and money. You can say it can be done another
 way...as the full pipeline ;).

 I dont want to be picky but if beam cant handle its components
 lifecycle it can be used at scale for generic pipelines and if bound to
 some particular IO.

 What does prevent to enforce teardown - ignoring the interstellar crash
 case which cant be handled by any human system? Nothing technically. Why do
 you push to not handle it? Is it due to some legacy code on dataflow or
 something else?

>>> Teardown *is* already documented and implemented this way (best-effort).
>>> So I'm not sure what kind of change you're asking for.
>>>
>>
>> Remove "best effort" from the javadoc. If it is not call then it is a bug
>> and we are done :).
>>
>>
>>>
>>>
 Also what does it mean for the users? Direct runner does it so if a
 user udes the RI in test, he will get a different behavior in prod? Also
 dont forget the user doesnt know what the IOs he composes use so this is so
 impacting for the whole product than he must be handled IMHO.

 I understand the portability culture is new in big data world but it is
 not a reason to ignore what people did for years and do it wrong before
 doing right ;).

 My proposal is to list what can prevent to guarantee - in the normal IT
 conditions - the execution of teardown. Then we see if we can handle it and
 only if there is a technical reason we cant we make it

Re: @TearDown guarantees

2018-02-18 Thread Jean-Baptiste Onofré
Hi,

I think, as you said, it depends of the protocol and the IO.

For instance, in first version of JdbcIO, I created the connections in @Setup
and released in @Teardown.

But, in case of streaming system, it's not so good (especially for pooling) as
the connection stays open for a very long time.

So, I updated to deal with connection in @StartBundle and release in 
@FinishBundle.

So, I think it depends of the kind of connections: the kind of connection
actually holding resources should be manage in bundle (at least for now), the
other kind of connection (just wrapping configuration but not holding resources
like Apache HTTP Component Client for instance) could be dealt in DoFn 
lifecycle.

Regards
JB

On 02/18/2018 11:05 AM, Romain Manni-Bucau wrote:
> 
> 
> Le 18 févr. 2018 00:23, "Kenneth Knowles"  > a écrit :
> 
> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau  > wrote:
> 
> If you give an example of a high-level need (e.g. "I'm trying to
> write an IO for system $x and it requires the following
> initialization and the following cleanup logic and the following
> processing in between") I'll be better able to help you.
> 
> 
> Take a simple example of a transform requiring a connection. Using
> bundles is a perf killer since size is not controlled. Using teardown
> doesnt allow you to release the connection since it is a best effort
> thing. Not releasing the connection makes you pay a lot - aws ;) - or
> prevents you to launch other processings - concurrent limit.
> 
> 
> For this example @Teardown is an exact fit. If things die so badly that
> @Teardown is not called then nothing else can be called to close the
> connection either. What AWS service are you thinking of that stays open 
> for
> a long time when everything at the other end has died?
> 
> 
> You assume connections are kind of stateless but some (proprietary) protocols
> requires some closing exchanges which are not only "im leaving".
> 
> For aws i was thinking about starting some services - machines - on the fly 
> in a
> pipeline startup and closing them at the end. If teardown is not called you 
> leak
> machines and money. You can say it can be done another way...as the full
> pipeline ;).
> 
> I dont want to be picky but if beam cant handle its components lifecycle it 
> can
> be used at scale for generic pipelines and if bound to some particular IO.
> 
> What does prevent to enforce teardown - ignoring the interstellar crash case
> which cant be handled by any human system? Nothing technically. Why do you 
> push
> to not handle it? Is it due to some legacy code on dataflow or something else?
> 
> Also what does it mean for the users? Direct runner does it so if a user udes
> the RI in test, he will get a different behavior in prod? Also dont forget the
> user doesnt know what the IOs he composes use so this is so impacting for the
> whole product than he must be handled IMHO.
> 
> I understand the portability culture is new in big data world but it is not a
> reason to ignore what people did for years and do it wrong before doing right 
> ;).
> 
> My proposal is to list what can prevent to guarantee - in the normal IT
> conditions - the execution of teardown. Then we see if we can handle it and 
> only
> if there is a technical reason we cant we make it experimental/unsupported in
> the api. I know spark and flink can, any unknown blocker for other runners?
> 
> Technical note: even a kill should go through java shutdown hooks otherwise 
> your
> environment (beam enclosing software) is fully unhandled and your overall 
> system
> is uncontrolled. Only case where it is not true is when the software is always
> owned by a vendor and never installed on customer environment. In this case it
> belongd to the vendor to handle beam API and not to beam to adjust its API 
> for a
> vendor - otherwise all unsupported features by one runner should be made
> optional right?
> 
> All state is not about network, even in distributed systems so this is key to
> have an explicit and defined lifecycle.
> 
> 
> Kenn
> 
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com