Re: Introducing a Redistribute transform

2016-10-12 Thread Jean-Baptiste Onofré

Hi Eugene,

thanks for the update on the mailing list, much appreciated.

Let me take a deeper look on that.

Regards
JB

On 10/13/2016 02:03 AM, Eugene Kirpichov wrote:

So, based on some offline discussion, the problem is more complex. There's
several classes of ultimate user needs which are potentially orthogonal,
even though the current Reshuffle transform, as implemented by the Dataflow
runner, happens to satisfy all of them at the same time:

1. Checkpointing a collection.

Suppose you have something like:

PCollection data = something.apply(ParDo.of(new GenerateFn()));
data.apply(ParDo.of(new WriteFn()))

Suppose GenerateFn is non-deterministic - it can generate different output
on the same input element.
Such Fn's are not forbidden by the Beam model per se, and are obviously
useful (e.g. it can be querying an external API, or it can be pairing each
output element with a random key, etc).

Beam model guarantees that the PCollection "data" will logically consist of
elements produced by *some* sequential execution of GenerateFn on elements
of the PCollection "something" - i.e., even if GenerateFn is invoked on the
same element multiple times and produces different results, only one of
those results will make it into the PCollection. The act of invoking a
DoFn, and the DoFn's side effects, are not part of the Beam model - only
PCollection contents are.

However, these factors can be important. E.g. imagine WriteFn writes data
to a third-party service. Suppose it even does so in an idempotent way
(e.g. suppose the data has a primary key, and WriteFn inserts or overwrites
the row in a database with each primary key) - then, each element on which
WriteFn is invoked will be written to the database exactly once.

However, "each element on which WriteFn is invoked" is not the same as
"each element in the PCollection data" - because, again, the Beam model
does not make guarantees about what DoFn's are invoked on.
In particular, in case of failures, WriteFn might be applied arbitrarily
many times to arbitrarily many different results of GenerateFn. Say,
imagine a hypothetical runner that executes the whole pipeline, and on any
failure, it re-executes the whole pipeline - over the course of that, even
if "data" logically has just 1 element, this element may be produced
multiple times and WriteFn might be applied to multiple different versions
of this element, and in the example above, it may insert extra rows into
the database.

Checkpointing can look like this:

  PCollection dataCheckpoint = data.apply(Checkpoint.create());

It limits the scope of non-determinism, and guarantees that an immediate
consumer of the collection "dataCheckpoint" will be invoked *only* on the
committed logical contents of the collection. It may still be invoked
multiple times, but on the same element, so it is sufficient to have
idempotency of side effects at the level of the consumer. Note: the
hypothetical "rerun full pipeline on failure" runner will not be able to
implement this transform correctly.

Reshuffle is used in this capacity in BigQueryIO:
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2718

2. Preventing a runner "producer-consumer fusion" optimization that would
have limited parallelism.

Suppose "pc" is a collection with 3 elements, and "GenerateFn" generates
1000 elements for each of them.

Then, consider running pc.apply(ParDo.of(new
GenerateFn()).apply(ParDo.of(new ProcessFn())).

Obviously, GenerateFn can at best be applied in parallel to the 3 elements
- no more parallelism can be achieved at this level.
However, ideally ProcessFn would be applied to the 3000 elements all in
parallel - but some runners implement "fusion" where they collapse a
sequence of ParDo's into a single ParDo whose DoFn is the composition of
the component DoFn's. In that case, the pipeline will apply the composition
of GenerateFn and ProcessFn in parallel to 3 elements, achieving a
parallelism of only 3.

This behavior, too, is not part of the Beam model. But we need a transform
that can disable this optimization - e.g. prevent fusion across a
particular PCollection, and guarantee that it is processed with as much
parallelism as if it had been read from a perfectly parallelizable location
(say, an Avro file): we could call it "Redistribute".

pc.apply(ParDo.of(new
GenerateFn())).apply(Redistribute.create()).apply(ParDo.of(new ProcessFn()))

3. Preventing a runner "sibling fusion" optimization

Suppose FooFn and BarFn are DoFn.
Suppose the pipeline looks like:

PCollection foos = ints.apply(ParDo.of(new FooFn()))
PCollection bars = ints.apply(ParDo.of(new BarFn()))
PCollectionList.of(foos).and(bars).apply(PubsubIO.Write.to(topic));

In this case, a runner might perform an optimization, fusing FooFn and
BarFn into an Fn that takes an element "x" and produces the concatenation
FooFn(x) + BarFn(x). In some cases, this can be undesirable

Re: Introducing a Redistribute transform

2016-10-12 Thread Eugene Kirpichov
So, based on some offline discussion, the problem is more complex. There's
several classes of ultimate user needs which are potentially orthogonal,
even though the current Reshuffle transform, as implemented by the Dataflow
runner, happens to satisfy all of them at the same time:

1. Checkpointing a collection.

Suppose you have something like:

PCollection data = something.apply(ParDo.of(new GenerateFn()));
data.apply(ParDo.of(new WriteFn()))

Suppose GenerateFn is non-deterministic - it can generate different output
on the same input element.
Such Fn's are not forbidden by the Beam model per se, and are obviously
useful (e.g. it can be querying an external API, or it can be pairing each
output element with a random key, etc).

Beam model guarantees that the PCollection "data" will logically consist of
elements produced by *some* sequential execution of GenerateFn on elements
of the PCollection "something" - i.e., even if GenerateFn is invoked on the
same element multiple times and produces different results, only one of
those results will make it into the PCollection. The act of invoking a
DoFn, and the DoFn's side effects, are not part of the Beam model - only
PCollection contents are.

However, these factors can be important. E.g. imagine WriteFn writes data
to a third-party service. Suppose it even does so in an idempotent way
(e.g. suppose the data has a primary key, and WriteFn inserts or overwrites
the row in a database with each primary key) - then, each element on which
WriteFn is invoked will be written to the database exactly once.

However, "each element on which WriteFn is invoked" is not the same as
"each element in the PCollection data" - because, again, the Beam model
does not make guarantees about what DoFn's are invoked on.
In particular, in case of failures, WriteFn might be applied arbitrarily
many times to arbitrarily many different results of GenerateFn. Say,
imagine a hypothetical runner that executes the whole pipeline, and on any
failure, it re-executes the whole pipeline - over the course of that, even
if "data" logically has just 1 element, this element may be produced
multiple times and WriteFn might be applied to multiple different versions
of this element, and in the example above, it may insert extra rows into
the database.

Checkpointing can look like this:

  PCollection dataCheckpoint = data.apply(Checkpoint.create());

It limits the scope of non-determinism, and guarantees that an immediate
consumer of the collection "dataCheckpoint" will be invoked *only* on the
committed logical contents of the collection. It may still be invoked
multiple times, but on the same element, so it is sufficient to have
idempotency of side effects at the level of the consumer. Note: the
hypothetical "rerun full pipeline on failure" runner will not be able to
implement this transform correctly.

Reshuffle is used in this capacity in BigQueryIO:
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2718

2. Preventing a runner "producer-consumer fusion" optimization that would
have limited parallelism.

Suppose "pc" is a collection with 3 elements, and "GenerateFn" generates
1000 elements for each of them.

Then, consider running pc.apply(ParDo.of(new
GenerateFn()).apply(ParDo.of(new ProcessFn())).

Obviously, GenerateFn can at best be applied in parallel to the 3 elements
- no more parallelism can be achieved at this level.
However, ideally ProcessFn would be applied to the 3000 elements all in
parallel - but some runners implement "fusion" where they collapse a
sequence of ParDo's into a single ParDo whose DoFn is the composition of
the component DoFn's. In that case, the pipeline will apply the composition
of GenerateFn and ProcessFn in parallel to 3 elements, achieving a
parallelism of only 3.

This behavior, too, is not part of the Beam model. But we need a transform
that can disable this optimization - e.g. prevent fusion across a
particular PCollection, and guarantee that it is processed with as much
parallelism as if it had been read from a perfectly parallelizable location
(say, an Avro file): we could call it "Redistribute".

pc.apply(ParDo.of(new
GenerateFn())).apply(Redistribute.create()).apply(ParDo.of(new ProcessFn()))

3. Preventing a runner "sibling fusion" optimization

Suppose FooFn and BarFn are DoFn.
Suppose the pipeline looks like:

PCollection foos = ints.apply(ParDo.of(new FooFn()))
PCollection bars = ints.apply(ParDo.of(new BarFn()))
PCollectionList.of(foos).and(bars).apply(PubsubIO.Write.to(topic));

In this case, a runner might perform an optimization, fusing FooFn and
BarFn into an Fn that takes an element "x" and produces the concatenation
FooFn(x) + BarFn(x). In some cases, this can be undesirable - e.g. suppose
that BarFn is much slower to compute than FooFn, but the results of FooFn
need to be sent to pubsub as quickly as possible. In that case we don't
want to wa

Re: Introducing a Redistribute transform

2016-10-11 Thread Kenneth Knowles
On Tue, Oct 11, 2016 at 10:56 AM Eugene Kirpichov
 wrote:

> Yeah, I'm starting to lean towards removing Redistribute.byKey() from the
> public API - because it only makes sense for getting access to per-key
> state, and 1) we don't have it yet and 2) runner should insert it
> automatically - so there's no use case for it.


+1 to removing Redistribute.byKey() from the public API.


> The "checkpointing keys" use
> case should be done via Redistribute.arbitrarily(), I believe.
>

Actually I think it really does have to be a GroupByKey followed by writing
the groups without breaking them up:

 - With GBK each element must appear in exactly one output group, so you
have to checkpoint or be able to retract groupings (nice easy explanation
from Thomas Groh; any faults in my paraphrase are my own).

 - But GBK followed by "output the elements one by one" actually removes
this property. Now you can replace the whole thing with a no-op and fuse
with downstream and still get exactly once processing according to the
model but not as observed via side effects to external systems. So sinks
should really be doing that, and I'll retract this use case for
Redistribute.

As for Redistribute.arbitrarily():
> In a batch runner, we could describe it as "identity transform, but runner
> is required to process the resulting PCollection with downstream transforms
> as well as if it had been created from elements via Create.of(), in terms
> of ability to parallelize processing and minimize amount of re-executed
> work in case of failures" (which is a fancy way of saying "materialize"
> without really saying "materialize" :) ).
>

How can you observe if the runner ignored you?


Re: Introducing a Redistribute transform

2016-10-11 Thread Eugene Kirpichov
Yeah, I'm starting to lean towards removing Redistribute.byKey() from the
public API - because it only makes sense for getting access to per-key
state, and 1) we don't have it yet and 2) runner should insert it
automatically - so there's no use case for it. The "checkpointing keys" use
case should be done via Redistribute.arbitrarily(), I believe.

As for Redistribute.arbitrarily():
In a batch runner, we could describe it as "identity transform, but runner
is required to process the resulting PCollection with downstream transforms
as well as if it had been created from elements via Create.of(), in terms
of ability to parallelize processing and minimize amount of re-executed
work in case of failures" (which is a fancy way of saying "materialize"
without really saying "materialize" :) ).
However, this reduction to Create.of() makes less sense in the unified
model, even though again the goal is to get the best-possible parallelism
and minimized re-execution.

On Tue, Oct 11, 2016 at 10:30 AM Robert Bradshaw
 wrote:

> Actually, Redistribute.perKey seems a bit dangerous, as there's no
> guarantee the partitioning is persisted to any subsequent steps, and
> we don't have a concrete notion of key-partitioned elements outside of
> GBK in the model. I suspect it was only introduced because that's what
> Redistribute.arbitrarily() is built on.
>
> On Tue, Oct 11, 2016 at 10:16 AM, Ben Chambers 
> wrote:
> > As Kenn points out, I think the nature of the Redistribute operation is
> to
> > act as a hint (or requirement) to the runner that a certain distribution
> > the elements is desirable. In a perfect this wouldn't be necessary
> because
> > every runner would be able to do exactly the right thing. Looking at the
> > different use cases may be helpful:
> >
> > 1. Redistribute.arbitrarily being used in IO as a fusion break and
> > checkpoint. We could express this as a hint saying that we'd like to
> > persist the PCollection at this point.
> > 2. Redistribute.perKey being used to checkpoint keys in a keyed
> > PCollection. I think this could be the same as the previous hint or a
> > variant thereof.
> > 3. Redistribute.perKey to ensure that the elements are distributed across
> > machines such that all elements with a specific key are on the same
> > machine. This should only be necessary for per-key processing (such as
> > state) and can be added by the runner when necessary (becomes easier once
> > we have a notion of transforms that preserve key-partitioning, etc.)
> >
> > Of these 1 and 2 seem to be the most interesting. The hint can be
> > implemented in various ways -- a transform that represents the hint (and
> > the runner can then implement as it sees fit) or via a method that sets
> > some property on the PCollection, to which the runner could choose to
> apply
> > a transform. I lean towards the former (keeping this as a transform)
> since
> > it fits more naturally into the codebase and doesn't require extending
> > PCollection (something we avoid).
> >
> > What if this was something like: ".apply(Hints.checkpoint())" or
> > ".apply(Hints.break())"? This makes it clearer that this is a hint to the
> > runner and not part of the semantics?
> >
> > On Tue, Oct 11, 2016 at 10:09 AM Kenneth Knowles  >
> > wrote:
> >
> >> On Mon, Oct 10, 2016 at 1:38 PM Eugene Kirpichov
> >>
> >>  wrote:
> >>
> >>
> >>
> >> > The transform, the way it's implemented, actually does several things
> at
> >>
> >> > the same time and that's why it's tricky to document it.
> >>
> >> >
> >>
> >>
> >>
> >> This thread has actually made me less sure about my thoughts on this
> >>
> >> transform. I do know what the transform is about and I do think we need
> it.
> >>
> >> But I don't know that it can be explained "within the model". Look at
> our
> >>
> >> classic questions about Redistribute.arbitrarily() and
> >> Redistribute.byKey():
> >>
> >>
> >>
> >>  - "what" is it computing? The identity on its input.
> >>
> >>  - "where" is the event time windowing? Same as its input.
> >>
> >>  - "when" is output produced? As fast as reasonable (runner-specific).
> >>
> >>  - "how" are refinements related? Same as its input (I think this might
> >>
> >> actually be incorrect if accumulating fired panes)
> >>
> >>
> >>
> >> These points don't describe any of the real goals of Redistribute. Hence
> >>
> >> describing it in terms of fusion and checkpointing, which are quite
> >>
> >> runner-specific in their (optional) manifestations.
> >>
> >>
> >>
> >> - Introduces a fusion barrier (in runners that have it), making sure
> that
> >>
> >> > the runner can fully parallelize processing the output PCollection
> with
> >>
> >> > DoFn's
> >>
> >> >
> >>
> >>
> >>
> >> Can a runner introduce other fusion barriers whenever it wants? Yes.
> >>
> >> Can a runner ignore a proposed fusion barrier? Yes. (or when can it not?
> >>
> >> why not?)
> >>
> >>
> >>
> >>
> >>
> >> > - Introduces a fault-tolerance barrier, effectively "checkpointing"
> the
> >>
> >

Re: Introducing a Redistribute transform

2016-10-11 Thread Robert Bradshaw
Actually, Redistribute.perKey seems a bit dangerous, as there's no
guarantee the partitioning is persisted to any subsequent steps, and
we don't have a concrete notion of key-partitioned elements outside of
GBK in the model. I suspect it was only introduced because that's what
Redistribute.arbitrarily() is built on.

On Tue, Oct 11, 2016 at 10:16 AM, Ben Chambers  wrote:
> As Kenn points out, I think the nature of the Redistribute operation is to
> act as a hint (or requirement) to the runner that a certain distribution
> the elements is desirable. In a perfect this wouldn't be necessary because
> every runner would be able to do exactly the right thing. Looking at the
> different use cases may be helpful:
>
> 1. Redistribute.arbitrarily being used in IO as a fusion break and
> checkpoint. We could express this as a hint saying that we'd like to
> persist the PCollection at this point.
> 2. Redistribute.perKey being used to checkpoint keys in a keyed
> PCollection. I think this could be the same as the previous hint or a
> variant thereof.
> 3. Redistribute.perKey to ensure that the elements are distributed across
> machines such that all elements with a specific key are on the same
> machine. This should only be necessary for per-key processing (such as
> state) and can be added by the runner when necessary (becomes easier once
> we have a notion of transforms that preserve key-partitioning, etc.)
>
> Of these 1 and 2 seem to be the most interesting. The hint can be
> implemented in various ways -- a transform that represents the hint (and
> the runner can then implement as it sees fit) or via a method that sets
> some property on the PCollection, to which the runner could choose to apply
> a transform. I lean towards the former (keeping this as a transform) since
> it fits more naturally into the codebase and doesn't require extending
> PCollection (something we avoid).
>
> What if this was something like: ".apply(Hints.checkpoint())" or
> ".apply(Hints.break())"? This makes it clearer that this is a hint to the
> runner and not part of the semantics?
>
> On Tue, Oct 11, 2016 at 10:09 AM Kenneth Knowles 
> wrote:
>
>> On Mon, Oct 10, 2016 at 1:38 PM Eugene Kirpichov
>>
>>  wrote:
>>
>>
>>
>> > The transform, the way it's implemented, actually does several things at
>>
>> > the same time and that's why it's tricky to document it.
>>
>> >
>>
>>
>>
>> This thread has actually made me less sure about my thoughts on this
>>
>> transform. I do know what the transform is about and I do think we need it.
>>
>> But I don't know that it can be explained "within the model". Look at our
>>
>> classic questions about Redistribute.arbitrarily() and
>> Redistribute.byKey():
>>
>>
>>
>>  - "what" is it computing? The identity on its input.
>>
>>  - "where" is the event time windowing? Same as its input.
>>
>>  - "when" is output produced? As fast as reasonable (runner-specific).
>>
>>  - "how" are refinements related? Same as its input (I think this might
>>
>> actually be incorrect if accumulating fired panes)
>>
>>
>>
>> These points don't describe any of the real goals of Redistribute. Hence
>>
>> describing it in terms of fusion and checkpointing, which are quite
>>
>> runner-specific in their (optional) manifestations.
>>
>>
>>
>> - Introduces a fusion barrier (in runners that have it), making sure that
>>
>> > the runner can fully parallelize processing the output PCollection with
>>
>> > DoFn's
>>
>> >
>>
>>
>>
>> Can a runner introduce other fusion barriers whenever it wants? Yes.
>>
>> Can a runner ignore a proposed fusion barrier? Yes. (or when can it not?
>>
>> why not?)
>>
>>
>>
>>
>>
>> > - Introduces a fault-tolerance barrier, effectively "checkpointing" the
>>
>> > input PCollection (again, in runners where it makes sense) and making
>> sure
>>
>> > that processing elements of the output PCollection with a DoFn, if the
>> DoFn
>>
>> > fails, will redo only that processing, but not need to recompute the
>> input
>>
>> > PCollection.
>>
>> >
>>
>>
>>
>> Can a runner introduce a checkpoint whenever appropriate? Yes.
>>
>> Can a runner ignore a hint to checkpoint? Yes (if it can still compute the
>>
>> same result - it may not even conceive of checkpointing in a compatible
>>
>> way).
>>
>>
>>
>> - All of the above and also makes the collection "key-partitioned", giving
>>
>> > access to per-key state to downstream key-preserving DoFns. However, this
>>
>> > is also runner-specific, because it's conceivable that a runner might not
>>
>> > need this "key-partitioned" property (in fact it's best if a runner
>>
>> > inserted such a "redistribute by key" automatically if it needs it...),
>> and
>>
>> > it currently isn't exposed anyway.
>>
>> >
>>
>>
>>
>> Agreed. The runner should insert the necessary keying wherever needed. One
>>
>> might say the same for other uses of Redistribute, but in practice hints
>>
>> are useful.
>>
>>
>>
>>
>>
>> > Still thinking about the best way to describe this in a way that's le

Re: Introducing a Redistribute transform

2016-10-11 Thread Ben Chambers
As Kenn points out, I think the nature of the Redistribute operation is to
act as a hint (or requirement) to the runner that a certain distribution
the elements is desirable. In a perfect this wouldn't be necessary because
every runner would be able to do exactly the right thing. Looking at the
different use cases may be helpful:

1. Redistribute.arbitrarily being used in IO as a fusion break and
checkpoint. We could express this as a hint saying that we'd like to
persist the PCollection at this point.
2. Redistribute.perKey being used to checkpoint keys in a keyed
PCollection. I think this could be the same as the previous hint or a
variant thereof.
3. Redistribute.perKey to ensure that the elements are distributed across
machines such that all elements with a specific key are on the same
machine. This should only be necessary for per-key processing (such as
state) and can be added by the runner when necessary (becomes easier once
we have a notion of transforms that preserve key-partitioning, etc.)

Of these 1 and 2 seem to be the most interesting. The hint can be
implemented in various ways -- a transform that represents the hint (and
the runner can then implement as it sees fit) or via a method that sets
some property on the PCollection, to which the runner could choose to apply
a transform. I lean towards the former (keeping this as a transform) since
it fits more naturally into the codebase and doesn't require extending
PCollection (something we avoid).

What if this was something like: ".apply(Hints.checkpoint())" or
".apply(Hints.break())"? This makes it clearer that this is a hint to the
runner and not part of the semantics?

On Tue, Oct 11, 2016 at 10:09 AM Kenneth Knowles 
wrote:

> On Mon, Oct 10, 2016 at 1:38 PM Eugene Kirpichov
>
>  wrote:
>
>
>
> > The transform, the way it's implemented, actually does several things at
>
> > the same time and that's why it's tricky to document it.
>
> >
>
>
>
> This thread has actually made me less sure about my thoughts on this
>
> transform. I do know what the transform is about and I do think we need it.
>
> But I don't know that it can be explained "within the model". Look at our
>
> classic questions about Redistribute.arbitrarily() and
> Redistribute.byKey():
>
>
>
>  - "what" is it computing? The identity on its input.
>
>  - "where" is the event time windowing? Same as its input.
>
>  - "when" is output produced? As fast as reasonable (runner-specific).
>
>  - "how" are refinements related? Same as its input (I think this might
>
> actually be incorrect if accumulating fired panes)
>
>
>
> These points don't describe any of the real goals of Redistribute. Hence
>
> describing it in terms of fusion and checkpointing, which are quite
>
> runner-specific in their (optional) manifestations.
>
>
>
> - Introduces a fusion barrier (in runners that have it), making sure that
>
> > the runner can fully parallelize processing the output PCollection with
>
> > DoFn's
>
> >
>
>
>
> Can a runner introduce other fusion barriers whenever it wants? Yes.
>
> Can a runner ignore a proposed fusion barrier? Yes. (or when can it not?
>
> why not?)
>
>
>
>
>
> > - Introduces a fault-tolerance barrier, effectively "checkpointing" the
>
> > input PCollection (again, in runners where it makes sense) and making
> sure
>
> > that processing elements of the output PCollection with a DoFn, if the
> DoFn
>
> > fails, will redo only that processing, but not need to recompute the
> input
>
> > PCollection.
>
> >
>
>
>
> Can a runner introduce a checkpoint whenever appropriate? Yes.
>
> Can a runner ignore a hint to checkpoint? Yes (if it can still compute the
>
> same result - it may not even conceive of checkpointing in a compatible
>
> way).
>
>
>
> - All of the above and also makes the collection "key-partitioned", giving
>
> > access to per-key state to downstream key-preserving DoFns. However, this
>
> > is also runner-specific, because it's conceivable that a runner might not
>
> > need this "key-partitioned" property (in fact it's best if a runner
>
> > inserted such a "redistribute by key" automatically if it needs it...),
> and
>
> > it currently isn't exposed anyway.
>
> >
>
>
>
> Agreed. The runner should insert the necessary keying wherever needed. One
>
> might say the same for other uses of Redistribute, but in practice hints
>
> are useful.
>
>
>
>
>
> > Still thinking about the best way to describe this in a way that's least
>
> > confusing to users.
>
> >
>
>
>
> I think it isn't just about users. I don't the transform is quite
>
> well-defined at the "what the runner must do" level. Here is a question I
>
> am considering: When is it _incorrect_ for a runner to replace a
>
> Redistribute with an identity transform? I have some thoughts, such as
>
> committing pseudorandomly generated data, but do you have some other ideas?
>
>
>
> Kenn
>
>


Re: Introducing a Redistribute transform

2016-10-11 Thread Kenneth Knowles
On Mon, Oct 10, 2016 at 1:38 PM Eugene Kirpichov
 wrote:

> The transform, the way it's implemented, actually does several things at
> the same time and that's why it's tricky to document it.
>

This thread has actually made me less sure about my thoughts on this
transform. I do know what the transform is about and I do think we need it.
But I don't know that it can be explained "within the model". Look at our
classic questions about Redistribute.arbitrarily() and Redistribute.byKey():

 - "what" is it computing? The identity on its input.
 - "where" is the event time windowing? Same as its input.
 - "when" is output produced? As fast as reasonable (runner-specific).
 - "how" are refinements related? Same as its input (I think this might
actually be incorrect if accumulating fired panes)

These points don't describe any of the real goals of Redistribute. Hence
describing it in terms of fusion and checkpointing, which are quite
runner-specific in their (optional) manifestations.

- Introduces a fusion barrier (in runners that have it), making sure that
> the runner can fully parallelize processing the output PCollection with
> DoFn's
>

Can a runner introduce other fusion barriers whenever it wants? Yes.
Can a runner ignore a proposed fusion barrier? Yes. (or when can it not?
why not?)


> - Introduces a fault-tolerance barrier, effectively "checkpointing" the
> input PCollection (again, in runners where it makes sense) and making sure
> that processing elements of the output PCollection with a DoFn, if the DoFn
> fails, will redo only that processing, but not need to recompute the input
> PCollection.
>

Can a runner introduce a checkpoint whenever appropriate? Yes.
Can a runner ignore a hint to checkpoint? Yes (if it can still compute the
same result - it may not even conceive of checkpointing in a compatible
way).

- All of the above and also makes the collection "key-partitioned", giving
> access to per-key state to downstream key-preserving DoFns. However, this
> is also runner-specific, because it's conceivable that a runner might not
> need this "key-partitioned" property (in fact it's best if a runner
> inserted such a "redistribute by key" automatically if it needs it...), and
> it currently isn't exposed anyway.
>

Agreed. The runner should insert the necessary keying wherever needed. One
might say the same for other uses of Redistribute, but in practice hints
are useful.


> Still thinking about the best way to describe this in a way that's least
> confusing to users.
>

I think it isn't just about users. I don't the transform is quite
well-defined at the "what the runner must do" level. Here is a question I
am considering: When is it _incorrect_ for a runner to replace a
Redistribute with an identity transform? I have some thoughts, such as
committing pseudorandomly generated data, but do you have some other ideas?

Kenn


Re: Introducing a Redistribute transform

2016-10-11 Thread Amit Sela
Thanks Eugene, I've added a few more questions inline.

So this is about rebalancing/autoscaling ?
When would you expect shards/bundles/partitions ( this language is becoming
extremely overloaded ;-) ) to be out-of-balance ?
I can think of:

   - After reading from IOs - IOs don't necessarily guarantee they'd be
   balanced, or parallelized according to the Pipeline's parallelism.
   - After "Key"ing / "Re-Key"ing / "Un-Key"ing - which could lead to
   downstream aggregations to be unbalanced.
   - After Flatten.

Anything else ?

This is NOT about Fanning in/out, correct ? hence the name Re*distribute*,
meaning across existing parallelism ?

Thanks,
Amit

On Mon, Oct 10, 2016 at 11:38 PM Eugene Kirpichov
 wrote:

> Hi Amit,
>
>
>
> The transform, the way it's implemented, actually does several things at
>
> the same time and that's why it's tricky to document it.
>
>
>
> Redistribute.arbitrarily():
>
> - Introduces a fusion barrier (in runners that have it), making sure that
>
> the runner can fully parallelize processing the output PCollection with
>
> DoFn's


> - Introduces a fault-tolerance barrier, effectively "checkpointing" the
>
> input PCollection (again, in runners where it makes sense) and making sure
>
> that processing elements of the output PCollection with a DoFn, if the DoFn
>
> fails, will redo only that processing, but not need to recompute the input
>
> PCollection.
>
Like caching

an RDD with Spark ? so that downstream computations won't repeat the
redistribution ?

>
>
>
> Redistribute.byKey():
>
> - All of the above and also makes the collection "key-partitioned", giving
>
> access to per-key state to downstream key-preserving DoFns. However, this
>
> is also runner-specific, because it's conceivable that a runner might not
>
> need this "key-partitioned" property (in fact it's best if a runner
>
> inserted such a "redistribute by key" automatically if it needs it...), and
>
> it currently isn't exposed anyway.

Isn't this the actual redistribution, while "arbitrarily" adds
pseudo-random keys to non-keyed records ?
As for per-key state, this is more for "Autoscaling" mid-pipeline (maybe
after a stateful operator which is tied to the key), right ?

>
>
>
> Still thinking about the best way to describe this in a way that's least
>
> confusing to users.
>
>
>
> Regarding redistributing into N shards: this is problematic because it
>
> doesn't seem to make sense in the unified model (in streaming in particular
>
> - having N keys doesn't mean you have N bundles), and breaks down if you
>
> add dynamic work rebalancing, backups and other magic. So I decided not to
>
> bother with this in that PR.
>
>
>
> Agreed with Robert that limiting the parallelism, or throttling, are very
>
> useful features, but Redistribute is not the right place to introduce them.
>
You're right, it fits around here somewhere (logically) but not as part of
redistribute.

>
>
>
> On Mon, Oct 10, 2016 at 12:58 PM Amit Sela  wrote:
>
>
>
> > On Mon, Oct 10, 2016 at 9:21 PM Robert Bradshaw
>
> > 
>
> > wrote:
>
> >
>
> > > On Sat, Oct 8, 2016 at 7:31 AM, Amit Sela 
> wrote:
>
> > >
>
> > > > Hi Eugene,
>
> > >
>
> > > >
>
> > >
>
> > > > This is very interesting.
>
> > >
>
> > > > Let me see if I get this right, the "Redistribute"  transformation
>
> > > assigns
>
> > >
>
> > > > a "running id" key (per-bundle) , calls "Redistribute.byKey", and
>
> > > extracts
>
> > >
>
> > > > back the values, correct ?
>
> > >
>
> > >
>
> > >
>
> > > The keys are (pseudorandomly) unique per element.
>
> > >
>
> > >
>
> > >
>
> > > > As for "Redistribute.byKey" - it's made of a GroupByKey
> transformation
>
> > > that
>
> > >
>
> > > > follows a Window transformation that neutralises the "resolution" of
>
> > >
>
> > > > triggers and panes that usually occurs in GroupByKey, correct ?
>
> > >
>
> > > >
>
> > >
>
> > > > So this is basically a "FanOut" transformation which will depend on
> the
>
> > >
>
> > > > available resources of the runner (and the uniqueness of the assigned
>
> > > keys)
>
> > >
>
> > > > ?
>
> > >
>
> > > >
>
> > >
>
> > > > Would we want to Redistribute into a user-defined number of bundles
> (>
>
> > >
>
> > > > current) ?
>
> > >
>
> > >
>
> > >
>
> > > I don't think there's any advantage to letting the user specify a
>
> > >
>
> > > number here; the data is spread out among as many machines as are
>
> > >
>
> > > handling the shuffling (for N elements, there are ~N unique keys,
>
> > >
>
> > > which gets partitioned by the system to the M workers).
>
> > >
>
> > >
>
> > >
>
> > > > How about "FanIn" ?
>
> > >
>
> > >
>
> > >
>
> > > Could you clarify what you would hope to use this for?
>
> > >
>
> > Well, what if for some reason I would want to limit parallelism for a
> step
>
> > in the Pipeline ? like calling an external service without "DDoS"ing it ?
>
> >
>
> > >
>
> > >
>
> > >
>
> > > > On Fri, Oct 7, 2016 at 10:49 PM Eugene K

Re: Introducing a Redistribute transform

2016-10-10 Thread Eugene Kirpichov
Hi Amit,

The transform, the way it's implemented, actually does several things at
the same time and that's why it's tricky to document it.

Redistribute.arbitrarily():
- Introduces a fusion barrier (in runners that have it), making sure that
the runner can fully parallelize processing the output PCollection with
DoFn's
- Introduces a fault-tolerance barrier, effectively "checkpointing" the
input PCollection (again, in runners where it makes sense) and making sure
that processing elements of the output PCollection with a DoFn, if the DoFn
fails, will redo only that processing, but not need to recompute the input
PCollection.

Redistribute.byKey():
- All of the above and also makes the collection "key-partitioned", giving
access to per-key state to downstream key-preserving DoFns. However, this
is also runner-specific, because it's conceivable that a runner might not
need this "key-partitioned" property (in fact it's best if a runner
inserted such a "redistribute by key" automatically if it needs it...), and
it currently isn't exposed anyway.

Still thinking about the best way to describe this in a way that's least
confusing to users.

Regarding redistributing into N shards: this is problematic because it
doesn't seem to make sense in the unified model (in streaming in particular
- having N keys doesn't mean you have N bundles), and breaks down if you
add dynamic work rebalancing, backups and other magic. So I decided not to
bother with this in that PR.

Agreed with Robert that limiting the parallelism, or throttling, are very
useful features, but Redistribute is not the right place to introduce them.

On Mon, Oct 10, 2016 at 12:58 PM Amit Sela  wrote:

> On Mon, Oct 10, 2016 at 9:21 PM Robert Bradshaw
> 
> wrote:
>
> > On Sat, Oct 8, 2016 at 7:31 AM, Amit Sela  wrote:
> >
> > > Hi Eugene,
> >
> > >
> >
> > > This is very interesting.
> >
> > > Let me see if I get this right, the "Redistribute"  transformation
> > assigns
> >
> > > a "running id" key (per-bundle) , calls "Redistribute.byKey", and
> > extracts
> >
> > > back the values, correct ?
> >
> >
> >
> > The keys are (pseudorandomly) unique per element.
> >
> >
> >
> > > As for "Redistribute.byKey" - it's made of a GroupByKey transformation
> > that
> >
> > > follows a Window transformation that neutralises the "resolution" of
> >
> > > triggers and panes that usually occurs in GroupByKey, correct ?
> >
> > >
> >
> > > So this is basically a "FanOut" transformation which will depend on the
> >
> > > available resources of the runner (and the uniqueness of the assigned
> > keys)
> >
> > > ?
> >
> > >
> >
> > > Would we want to Redistribute into a user-defined number of bundles (>
> >
> > > current) ?
> >
> >
> >
> > I don't think there's any advantage to letting the user specify a
> >
> > number here; the data is spread out among as many machines as are
> >
> > handling the shuffling (for N elements, there are ~N unique keys,
> >
> > which gets partitioned by the system to the M workers).
> >
> >
> >
> > > How about "FanIn" ?
> >
> >
> >
> > Could you clarify what you would hope to use this for?
> >
> Well, what if for some reason I would want to limit parallelism for a step
> in the Pipeline ? like calling an external service without "DDoS"ing it ?
>
> >
> >
> >
> > > On Fri, Oct 7, 2016 at 10:49 PM Eugene Kirpichov
> >
> > >  wrote:
> >
> > >
> >
> > >> Hello,
> >
> > >>
> >
> > >> Heads up that https://github.com/apache/incubator-beam/pull/1036 will
> >
> > >> introduce a transform called "Redistribute", encapsulating a
> relatively
> >
> > >> common pattern - a "fusion break" [see
> >
> > >>
> >
> > >>
> >
> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
> >
> > >> previously
> >
> > >> providing advice on that] - useful e.g. when you write an IO as a
> > sequence
> >
> > >> of ParDo's: split a query into parts, read each part, and you want to
> >
> > >> prevent fusing these ParDo's because that would make the whole thing
> >
> > >> execute sequentially, and in other similar cases.
> >
> > >>
> >
> > >> The PR also uses it, as an example, in DatastoreIO and JdbcIO, both of
> >
> > >> which used to have a hand-rolled implementation of the same. The Write
> >
> > >> transform has something similar, but not quite identical, so I skipped
> > it.
> >
> > >>
> >
> > >> This is not a model change - merely providing a common implementation
> of
> >
> > >> something useful that already existed but was scattered across the
> >
> > >> codebase.
> >
> > >>
> >
> > >> Redistribute also subsumes the old mostly-internal Reshuffle transform
> > via
> >
> > >> Redistribute.byKey().
> >
> > >>
> >
> > >> I tried finding more cases in the Beam codebase that have an ad-hoc
> >
> > >> implementation of this; I did not find any, but I might have missed
> >
> > >> something. I suppose the transform will need to be advertised in
> >
> > >> documentation on best-practices for connector development; perhaps
> some
> >
> > >> StackOverflow

Re: Introducing a Redistribute transform

2016-10-10 Thread Robert Bradshaw
On Mon, Oct 10, 2016 at 12:57 PM, Amit Sela  wrote:

>> > So this is basically a "FanOut" transformation which will depend on the
>>
>> > available resources of the runner (and the uniqueness of the assigned
>> keys)
>>
>> > ?
>>
>> >
>>
>> > Would we want to Redistribute into a user-defined number of bundles (>
>>
>> > current) ?
>>
>>
>>
>> I don't think there's any advantage to letting the user specify a
>>
>> number here; the data is spread out among as many machines as are
>>
>> handling the shuffling (for N elements, there are ~N unique keys,
>>
>> which gets partitioned by the system to the M workers).
>>
>>
>>
>> > How about "FanIn" ?
>>
>>
>>
>> Could you clarify what you would hope to use this for?
>>
> Well, what if for some reason I would want to limit parallelism for a step
> in the Pipeline ? like calling an external service without "DDoS"ing it ?

I think this is something is more difficult to enforce without
runner-specific support. For example, if one writes

input.apply(Redistribute(N)).apply(ParDo(...))

one is assuming that fusion takes place such that the subsequent ParDo
doesn't happen to get processed by more-than-expected shards. It's
also much simpler to spread the elements out among 2^64 keys than
spread them out to a small N keys, and choosing exactly N keys isn't
necessarily the best way to enforce parallelism constraints (as this
would likely introduce stragglers). One typically wants to reduce
parallelism over a portion (interval?) of a pipeline, whereas
redistribution operates at a point in your pipeline.

I agree that being able to limit parallelism (possibly dynamically
based on pushback from an external service, or noting that throughput
is no longer scaling linearly) would be a useful feature to have, but
that's a bit out of scope here.


Re: Introducing a Redistribute transform

2016-10-10 Thread Amit Sela
On Mon, Oct 10, 2016 at 9:21 PM Robert Bradshaw 
wrote:

> On Sat, Oct 8, 2016 at 7:31 AM, Amit Sela  wrote:
>
> > Hi Eugene,
>
> >
>
> > This is very interesting.
>
> > Let me see if I get this right, the "Redistribute"  transformation
> assigns
>
> > a "running id" key (per-bundle) , calls "Redistribute.byKey", and
> extracts
>
> > back the values, correct ?
>
>
>
> The keys are (pseudorandomly) unique per element.
>
>
>
> > As for "Redistribute.byKey" - it's made of a GroupByKey transformation
> that
>
> > follows a Window transformation that neutralises the "resolution" of
>
> > triggers and panes that usually occurs in GroupByKey, correct ?
>
> >
>
> > So this is basically a "FanOut" transformation which will depend on the
>
> > available resources of the runner (and the uniqueness of the assigned
> keys)
>
> > ?
>
> >
>
> > Would we want to Redistribute into a user-defined number of bundles (>
>
> > current) ?
>
>
>
> I don't think there's any advantage to letting the user specify a
>
> number here; the data is spread out among as many machines as are
>
> handling the shuffling (for N elements, there are ~N unique keys,
>
> which gets partitioned by the system to the M workers).
>
>
>
> > How about "FanIn" ?
>
>
>
> Could you clarify what you would hope to use this for?
>
Well, what if for some reason I would want to limit parallelism for a step
in the Pipeline ? like calling an external service without "DDoS"ing it ?

>
>
>
> > On Fri, Oct 7, 2016 at 10:49 PM Eugene Kirpichov
>
> >  wrote:
>
> >
>
> >> Hello,
>
> >>
>
> >> Heads up that https://github.com/apache/incubator-beam/pull/1036 will
>
> >> introduce a transform called "Redistribute", encapsulating a relatively
>
> >> common pattern - a "fusion break" [see
>
> >>
>
> >>
> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
>
> >> previously
>
> >> providing advice on that] - useful e.g. when you write an IO as a
> sequence
>
> >> of ParDo's: split a query into parts, read each part, and you want to
>
> >> prevent fusing these ParDo's because that would make the whole thing
>
> >> execute sequentially, and in other similar cases.
>
> >>
>
> >> The PR also uses it, as an example, in DatastoreIO and JdbcIO, both of
>
> >> which used to have a hand-rolled implementation of the same. The Write
>
> >> transform has something similar, but not quite identical, so I skipped
> it.
>
> >>
>
> >> This is not a model change - merely providing a common implementation of
>
> >> something useful that already existed but was scattered across the
>
> >> codebase.
>
> >>
>
> >> Redistribute also subsumes the old mostly-internal Reshuffle transform
> via
>
> >> Redistribute.byKey().
>
> >>
>
> >> I tried finding more cases in the Beam codebase that have an ad-hoc
>
> >> implementation of this; I did not find any, but I might have missed
>
> >> something. I suppose the transform will need to be advertised in
>
> >> documentation on best-practices for connector development; perhaps some
>
> >> StackOverflow answers should be updated; any other places?
>
> >>
>
>


Re: Introducing a Redistribute transform

2016-10-10 Thread Robert Bradshaw
On Sat, Oct 8, 2016 at 7:31 AM, Amit Sela  wrote:
> Hi Eugene,
>
> This is very interesting.
> Let me see if I get this right, the "Redistribute"  transformation assigns
> a "running id" key (per-bundle) , calls "Redistribute.byKey", and extracts
> back the values, correct ?

The keys are (pseudorandomly) unique per element.

> As for "Redistribute.byKey" - it's made of a GroupByKey transformation that
> follows a Window transformation that neutralises the "resolution" of
> triggers and panes that usually occurs in GroupByKey, correct ?
>
> So this is basically a "FanOut" transformation which will depend on the
> available resources of the runner (and the uniqueness of the assigned keys)
> ?
>
> Would we want to Redistribute into a user-defined number of bundles (>
> current) ?

I don't think there's any advantage to letting the user specify a
number here; the data is spread out among as many machines as are
handling the shuffling (for N elements, there are ~N unique keys,
which gets partitioned by the system to the M workers).

> How about "FanIn" ?

Could you clarify what you would hope to use this for?

> On Fri, Oct 7, 2016 at 10:49 PM Eugene Kirpichov
>  wrote:
>
>> Hello,
>>
>> Heads up that https://github.com/apache/incubator-beam/pull/1036 will
>> introduce a transform called "Redistribute", encapsulating a relatively
>> common pattern - a "fusion break" [see
>>
>> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
>> previously
>> providing advice on that] - useful e.g. when you write an IO as a sequence
>> of ParDo's: split a query into parts, read each part, and you want to
>> prevent fusing these ParDo's because that would make the whole thing
>> execute sequentially, and in other similar cases.
>>
>> The PR also uses it, as an example, in DatastoreIO and JdbcIO, both of
>> which used to have a hand-rolled implementation of the same. The Write
>> transform has something similar, but not quite identical, so I skipped it.
>>
>> This is not a model change - merely providing a common implementation of
>> something useful that already existed but was scattered across the
>> codebase.
>>
>> Redistribute also subsumes the old mostly-internal Reshuffle transform via
>> Redistribute.byKey().
>>
>> I tried finding more cases in the Beam codebase that have an ad-hoc
>> implementation of this; I did not find any, but I might have missed
>> something. I suppose the transform will need to be advertised in
>> documentation on best-practices for connector development; perhaps some
>> StackOverflow answers should be updated; any other places?
>>


Re: Introducing a Redistribute transform

2016-10-08 Thread Amit Sela
Hi Eugene,

This is very interesting.
Let me see if I get this right, the "Redistribute"  transformation assigns
a "running id" key (per-bundle) , calls "Redistribute.byKey", and extracts
back the values, correct ?
As for "Redistribute.byKey" - it's made of a GroupByKey transformation that
follows a Window transformation that neutralises the "resolution" of
triggers and panes that usually occurs in GroupByKey, correct ?

So this is basically a "FanOut" transformation which will depend on the
available resources of the runner (and the uniqueness of the assigned keys)
?

Would we want to Redistribute into a user-defined number of bundles (>
current) ?

How about "FanIn" ?

Thanks,
Amit


On Fri, Oct 7, 2016 at 10:49 PM Eugene Kirpichov
 wrote:

> Hello,
>
> Heads up that https://github.com/apache/incubator-beam/pull/1036 will
> introduce a transform called "Redistribute", encapsulating a relatively
> common pattern - a "fusion break" [see
>
> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
> previously
> providing advice on that] - useful e.g. when you write an IO as a sequence
> of ParDo's: split a query into parts, read each part, and you want to
> prevent fusing these ParDo's because that would make the whole thing
> execute sequentially, and in other similar cases.
>
> The PR also uses it, as an example, in DatastoreIO and JdbcIO, both of
> which used to have a hand-rolled implementation of the same. The Write
> transform has something similar, but not quite identical, so I skipped it.
>
> This is not a model change - merely providing a common implementation of
> something useful that already existed but was scattered across the
> codebase.
>
> Redistribute also subsumes the old mostly-internal Reshuffle transform via
> Redistribute.byKey().
>
> I tried finding more cases in the Beam codebase that have an ad-hoc
> implementation of this; I did not find any, but I might have missed
> something. I suppose the transform will need to be advertised in
> documentation on best-practices for connector development; perhaps some
> StackOverflow answers should be updated; any other places?
>